Re: Not able to update collections
I am a beginner to Scala/Spark. Could you please elaborate on how to make RDD of results of func() and collect? On Tue, Feb 24, 2015 at 2:27 PM, Sean Owen so...@cloudera.com wrote: They aren't the same 'lst'. One is on your driver. It gets copied to executors when the tasks are executed. Those copies are updated. But the updates will never reflect in the local copy back in the driver. You may just wish to make an RDD of the results of func() and collect() them back to the driver. On Tue, Feb 24, 2015 at 7:20 PM, kvvt kvi...@vt.edu wrote: I am working on the below piece of code. var lst = scala.collection.mutable.MutableList[VertexId]() graph.edges.groupBy[VertexId](f).foreach { edgesBySrc = { lst ++= func(edgesBySrc) } } println(lst.length) Here, the final println() always says that the length of the list is 0. The list is non-empty (correctly prints the length of the returned list inside func()). I am not sure if I am doing the append correctly. Can someone point out what I am doing wrong? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Not-able-to-update-collections-tp21790.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Movie Recommendation tutorial
It's something like the average error in rating, but a bit different -- it's the square root of average squared error. But if you think of the ratings as 'stars' you could kind of think of 0.86 as 'generally off by 0.86' stars and that would be somewhat right. Whether that's good depends on what the range of input was. For 1-5 that's OK; for 1-100 it would be fantastic. To give you a point of comparison, when Netflix launched their Netflix Prize, their recommender had an RMSE of 0.95 or so. The winning solution was at about 0.85. Their data set was a larger, harder problem than the movielens data set though. So: reasonably good. On Tue, Feb 24, 2015 at 8:19 PM, Krishna Sankar ksanka...@gmail.com wrote: Yep, much better with 0.1. The best model was trained with rank = 12 and lambda = 0.1, and numIter = 20, and its RMSE on the test set is 0.869092 (Spark 1.3.0) Question : What is the intuition behind RSME of 0.86 vs 1.3 ? I know the smaller the better. But is it that better ? And what is a good number for a recommendation engine ? Cheers k/ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Not able to update collections
Instead of ...foreach { edgesBySrc = { lst ++= func(edgesBySrc) } } try ...flatMap { edgesBySrc = func(edgesBySrc) } or even more succinctly ...flatMap(func) This returns an RDD that basically has the list you are trying to build, I believe. You can collect() to the driver but beware if it is a huge data set. If you really just mean to count the results, you can count() instead On Tue, Feb 24, 2015 at 7:35 PM, Vijayasarathy Kannan kvi...@vt.edu wrote: I am a beginner to Scala/Spark. Could you please elaborate on how to make RDD of results of func() and collect? On Tue, Feb 24, 2015 at 2:27 PM, Sean Owen so...@cloudera.com wrote: They aren't the same 'lst'. One is on your driver. It gets copied to executors when the tasks are executed. Those copies are updated. But the updates will never reflect in the local copy back in the driver. You may just wish to make an RDD of the results of func() and collect() them back to the driver. On Tue, Feb 24, 2015 at 7:20 PM, kvvt kvi...@vt.edu wrote: I am working on the below piece of code. var lst = scala.collection.mutable.MutableList[VertexId]() graph.edges.groupBy[VertexId](f).foreach { edgesBySrc = { lst ++= func(edgesBySrc) } } println(lst.length) Here, the final println() always says that the length of the list is 0. The list is non-empty (correctly prints the length of the returned list inside func()). I am not sure if I am doing the append correctly. Can someone point out what I am doing wrong? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Not-able-to-update-collections-tp21790.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Not able to update collections
Thanks, but it still doesn't seem to work. Below is my entire code. var mp = scala.collection.mutable.Map[VertexId, Int]() var myRdd = graph.edges.groupBy[VertexId](f).flatMap { edgesBySrc = func(edgesBySrc, a, b) } myRdd.foreach { node = { mp(node) = 1 } } Values in mp do not get updated for any element in myRdd. On Tue, Feb 24, 2015 at 2:39 PM, Sean Owen so...@cloudera.com wrote: Instead of ...foreach { edgesBySrc = { lst ++= func(edgesBySrc) } } try ...flatMap { edgesBySrc = func(edgesBySrc) } or even more succinctly ...flatMap(func) This returns an RDD that basically has the list you are trying to build, I believe. You can collect() to the driver but beware if it is a huge data set. If you really just mean to count the results, you can count() instead On Tue, Feb 24, 2015 at 7:35 PM, Vijayasarathy Kannan kvi...@vt.edu wrote: I am a beginner to Scala/Spark. Could you please elaborate on how to make RDD of results of func() and collect? On Tue, Feb 24, 2015 at 2:27 PM, Sean Owen so...@cloudera.com wrote: They aren't the same 'lst'. One is on your driver. It gets copied to executors when the tasks are executed. Those copies are updated. But the updates will never reflect in the local copy back in the driver. You may just wish to make an RDD of the results of func() and collect() them back to the driver. On Tue, Feb 24, 2015 at 7:20 PM, kvvt kvi...@vt.edu wrote: I am working on the below piece of code. var lst = scala.collection.mutable.MutableList[VertexId]() graph.edges.groupBy[VertexId](f).foreach { edgesBySrc = { lst ++= func(edgesBySrc) } } println(lst.length) Here, the final println() always says that the length of the list is 0. The list is non-empty (correctly prints the length of the returned list inside func()). I am not sure if I am doing the append correctly. Can someone point out what I am doing wrong? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Not-able-to-update-collections-tp21790.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Movie Recommendation tutorial
Yep, much better with 0.1. The best model was trained with rank = 12 and lambda = 0.1, and numIter = 20, and its RMSE on the test set is 0.869092 (Spark 1.3.0) Question : What is the intuition behind RSME of 0.86 vs 1.3 ? I know the smaller the better. But is it that better ? And what is a good number for a recommendation engine ? Cheers k/ On Tue, Feb 24, 2015 at 1:03 AM, Guillaume Charhon guilla...@databerries.com wrote: I am using Spark 1.2.1. Thank you Krishna, I am getting almost the same results as you so it must be an error in the tutorial. Xiangrui, I made some additional tests with lambda to 0.1 and I am getting a much better rmse: RMSE (validation) = 0.868981 for the model trained with rank = 8, lambda = 0.1, and numIter = 10. RMSE (validation) = 0.869628 for the model trained with rank = 8, lambda = 0.1, and numIter = 20. RMSE (validation) = 1.361321 for the model trained with rank = 8, lambda = 1.0, and numIter = 10. RMSE (validation) = 1.361321 for the model trained with rank = 8, lambda = 1.0, and numIter = 20. RMSE (validation) = 3.755870 for the model trained with rank = 8, lambda = 10.0, and numIter = 10. RMSE (validation) = 3.755870 for the model trained with rank = 8, lambda = 10.0, and numIter = 20. RMSE (validation) = 0.866605 for the model trained with rank = 12, lambda = 0.1, and numIter = 10. RMSE (validation) = 0.867498 for the model trained with rank = 12, lambda = 0.1, and numIter = 20. RMSE (validation) = 1.361321 for the model trained with rank = 12, lambda = 1.0, and numIter = 10. RMSE (validation) = 1.361321 for the model trained with rank = 12, lambda = 1.0, and numIter = 20. RMSE (validation) = 3.755870 for the model trained with rank = 12, lambda = 10.0, and numIter = 10. RMSE (validation) = 3.755870 for the model trained with rank = 12, lambda = 10.0, and numIter = 20. The best model was trained with rank = 12 and lambda = 0.1, and numIter = 10, and its RMSE on the test set is 0.865407. On Tue, Feb 24, 2015 at 7:23 AM, Xiangrui Meng men...@gmail.com wrote: Try to set lambda to 0.1. -Xiangrui On Mon, Feb 23, 2015 at 3:06 PM, Krishna Sankar ksanka...@gmail.com wrote: The RSME varies a little bit between the versions. Partitioned the training,validation,test set like so: training = ratings_rdd_01.filter(lambda x: (x[3] % 10) 6) validation = ratings_rdd_01.filter(lambda x: (x[3] % 10) = 6 and (x[3] % 10) 8) test = ratings_rdd_01.filter(lambda x: (x[3] % 10) = 8) Validation MSE : # 1.3.0 Mean Squared Error = 0.871456869392 # 1.2.1 Mean Squared Error = 0.877305629074 Itertools results: 1.3.0 - RSME = 1.354839 (rank = 8 and lambda = 1.0, and numIter = 20) 1.1.1 - RSME = 1.335831 (rank = 8 and lambda = 1.0, and numIter = 10) Cheers k/ On Mon, Feb 23, 2015 at 12:37 PM, Xiangrui Meng men...@gmail.com wrote: Which Spark version did you use? Btw, there are three datasets from MovieLens. The tutorial used the medium one (1 million). -Xiangrui On Mon, Feb 23, 2015 at 8:36 AM, poiuytrez guilla...@databerries.com wrote: What do you mean? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Movie-Recommendation-tutorial-tp21769p21771.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Missing shuffle files
If you thinking of the yarn memory overhead, then yes, I have increased that as well. However, I'm glad to say that my job finished successfully finally. Besides the timeout and memory settings, performing repartitioning (with shuffling) at the right time seems to be the key to make this large job succeed. With all the transformations in the job, the partition distribution was becoming increasingly skewed. Not easy to figure out when and to what number of partitions to set, and takes forever to tweak these settings since it's works perfectly for small datasets and you'll have to experiment with large time-consuming jobs. Imagine if there was an automatic partition reconfiguration function that automagically did that... On Tue, Feb 24, 2015 at 3:20 AM, Corey Nolet cjno...@gmail.com wrote: I *think* this may have been related to the default memory overhead setting being too low. I raised the value to 1G it and tried my job again but i had to leave the office before it finished. It did get further but I'm not exactly sure if that's just because i raised the memory. I'll see tomorrow- but i have a suspicion this may have been the cause of the executors being killed by the application master. On Feb 23, 2015 5:25 PM, Corey Nolet cjno...@gmail.com wrote: I've got the opposite problem with regards to partitioning. I've got over 6000 partitions for some of these RDDs which immediately blows the heap somehow- I'm still not exactly sure how. If I coalesce them down to about 600-800 partitions, I get the problems where the executors are dying without any other error messages (other than telling me the executor was lost in the UI). If I don't coalesce, I pretty immediately get Java heap space exceptions that kill the job altogether. Putting in the timeouts didn't seem to help the case where I am coalescing. Also, I don't see any dfferences between 'disk only' and 'memory and disk' storage levels- both of them are having the same problems. I notice large shuffle files (30-40gb) that only seem to spill a few hundred mb. On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg arp...@spotify.com wrote: Sounds very similar to what I experienced Corey. Something that seems to at least help with my problems is to have more partitions. Am already fighting between ending up with too many partitions in the end and having too few in the beginning. By coalescing at late as possible and avoiding too few in the beginning, the problems seems to decrease. Also, increasing spark.akka.askTimeout and spark.core.connection.ack.wait.timeout significantly (~700 secs), the problems seems to almost disappear. Don't wont to celebrate yet, still long way left before the job complete but it's looking better... On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet cjno...@gmail.com wrote: I'm looking @ my yarn container logs for some of the executors which appear to be failing (with the missing shuffle files). I see exceptions that say client.TransportClientFactor: Found inactive connection to host/ip:port, closing it. Right after that I see shuffle.RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to connect to host/ip:port Right after that exception I see RECEIVED SIGNAL 15: SIGTERM Finally, following the sigterm, I see FileNotFoundExcception: /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No such file for directory) I'm looking @ the nodemanager and application master logs and I see no indications whatsoever that there were any memory issues during this period of time. The Spark UI is telling me none of the executors are really using too much memory when this happens. It is a big job that's catching several 100's of GB but each node manager on the cluster has 64gb of ram just for yarn containers (physical nodes have 128gb). On this cluster, we have 128 nodes. I've also tried using DISK_ONLY storage level but to no avail. Any further ideas on how to track this down? Again, we're able to run this same job on about 1/5th of the data just fine.The only thing that's pointing me towards a memory issue is that it seems to be happening in the same stages each time and when I lower the memory that each executor has allocated it happens in earlier stages but I can't seem to find anything that says an executor (or container for that matter) has run low on memory. On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com wrote: No, unfortunately we're not making use of dynamic allocation or the external shuffle service. Hoping that we could reconfigure our cluster to make use of it, but since it requires changes to the cluster itself (and not just the Spark app), it could take some time. Unsure if task 450 was acting as a reducer or not, but seems possible. Probably due to a crashed executor as you say. Seems like I need to do some more advanced partition tuning to make this job work, as it's
Running multiple threads with same Spark Context
Hi all, I have been running a simple SQL program on Spark. To test the concurrency, I have created 10 threads inside the program, all threads using same SQLContext object. When I ran the program on my EC2 cluster using spark-submit, only 3 threads were running in parallel. I have repeated the test on different EC2 clusters (containing different number of cores) and found out that only 3 threads are running in parallel on every cluster. Why is this behaviour seen? What does this number 3 specify? Is there any configuration parameter that I have to set if I want to run more threads concurrently? Thanks Harika -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-multiple-threads-with-same-Spark-Context-tp21784.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD String foreach println
println occurs on the machine where the task executes, which may or may not be the same as your local driver process. collect()-ing brings data back to the driver, so printing there definitely occurs on the driver. On Tue, Feb 24, 2015 at 9:48 AM, patcharee patcharee.thong...@uni.no wrote: Hi, I would like to print the content of RDD[String]. I tried 1) linesWithSpark.foreach(println) 2) linesWithSpark.collect().foreach(println) I submitted the job by spark-submit. 1) did not print, but 2) did. But when I used the shell, both 1) and 2) printed. Any ideas why 1) behaves differently on job submit and shell? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: HiveContext in SparkSQL - concurrency issues
Hi Sreeharsha, My data is in HDFS. I am trying to use Spark HiveContext (instead of SQLContext) to fire queries on my data just because HiveContext supports more operations. Sreeharsha wrote Change derby to mysql and check once me to faced the same issue I am pretty new to Spark and Hive. I do not know how to change from Derby to MySQL. The log which I posted is when I simply changed from SQLContext to HiveContext. Do I have to change any property inorder to point HiveContext to use MySQL instead of Derby? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/HiveContext-in-SparkSQL-concurrency-issues-tp21491p21783.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
updateStateByKey and invFunction
So say I want to calculate top K users visiting a page in the past 2 hours updated every 5 mins. so here I want to maintain something like this Page_01 = {user_01:32, user_02:3, user_03:7...} ... Basically a count of number of times a user visited a page. Here my key is page name/id and state is the hashmap. Now in updateStateByKey I get the previous state and new events coming *in* the window. Is there a way to also get the events going *out* of the window? This was I can incrementally update the state over a rolling window. What is the efficient way to do it in spark streaming? Thanks Ashish
Re: updateStateByKey and invFunction
You can use a reduceByKeyAndWindow with your specific time window. You can specify the inverse function in reduceByKeyAndWindow. On Tue, Feb 24, 2015 at 1:36 PM, Ashish Sharma ashishonl...@gmail.com wrote: So say I want to calculate top K users visiting a page in the past 2 hours updated every 5 mins. so here I want to maintain something like this Page_01 = {user_01:32, user_02:3, user_03:7...} ... Basically a count of number of times a user visited a page. Here my key is page name/id and state is the hashmap. Now in updateStateByKey I get the previous state and new events coming *in* the window. Is there a way to also get the events going *out* of the window? This was I can incrementally update the state over a rolling window. What is the efficient way to do it in spark streaming? Thanks Ashish -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Facing error while extending scala class with Product interface to overcome limit of 22 fields in spark-shell
My issue is posted here on stack-overflow. What am I doing wrong here? http://stackoverflow.com/questions/28689186/facing-error-while-extending-scala-class-with-product-interface-to-overcome-limi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Facing-error-while-extending-scala-class-with-Product-interface-to-overcome-limit-of-22-fields-in-spl-tp21787.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: On app upgrade, restore sliding window data.
I think this could be of some help to you. https://issues.apache.org/jira/browse/SPARK-3660 On Tue, Feb 24, 2015 at 2:18 AM, Matus Faro matus.f...@kik.com wrote: Hi, Our application is being designed to operate at all times on a large sliding window (day+) of data. The operations performed on the window of data will change fairly frequently and I need a way to save and restore the sliding window after an app upgrade without having to wait the duration of the sliding window to warm up. Because it's an app upgrade, checkpointing will not work unfortunately. I can potentially dump the window to an outside storage periodically or on app shutdown, but I don't have an ideal way of restoring it. I thought about two non-ideal solutions: 1. Load the previous data all at once into the sliding window on app startup. The problem is, at one point I will have double the data in the sliding window until the initial batch of data goes out of scope. 2. Broadcast the previous state of the window separately from the window. Perform the operations on both sets of data until it comes out of scope. The problem is, the data will not fit into memory. Solutions that would solve my problem: 1. Ability to pre-populate sliding window. 2. Have control over batch slicing. It would be nice for a Receiver to dictate the current batch timestamp in order to slow down or fast forward time. Any feedback would be greatly appreciated! Thank you, Matus - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
RDD String foreach println
Hi, I would like to print the content of RDD[String]. I tried 1) linesWithSpark.foreach(println) 2) linesWithSpark.collect().foreach(println) I submitted the job by spark-submit. 1) did not print, but 2) did. But when I used the shell, both 1) and 2) printed. Any ideas why 1) behaves differently on job submit and shell? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Getting to proto buff classes in Spark Context
I assume this is a difference between your local driver classpath and remote worker classpath. It may not be a question of whether the class is there, but classpath visibility issues. Have you looked into settings like spark.files.userClassPathFirst? On Tue, Feb 24, 2015 at 4:43 AM, necro351 . necro...@gmail.com wrote: Hello, I am trying to deserialize some data encoded using proto buff from within Spark and am getting class-not-found exceptions. I have narrowed the program down to something very simple that shows the problem exactly (see 'The Program' below) and hopefully someone can tell me the easy fix :) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Re: Many Receiver vs. Many threads per Receiver
Thanks Akhil. Not sure whether thelowlevel consumer.will be officially supported by Spark Streaming. So far, I don't see it mentioned/documented in the spark streaming programming guide. bit1...@163.com From: Akhil Das Date: 2015-02-24 16:21 To: bit1...@163.com CC: user Subject: Re: Many Receiver vs. Many threads per Receiver I believe when you go with 1, it will distribute the consumer across your cluster (possibly on 6 machines), but still it i don't see a away to tell from which partition it will consume etc. If you are looking to have a consumer where you can specify the partition details and all, then you are better off with the lowlevel consumer. Thanks Best Regards On Tue, Feb 24, 2015 at 9:36 AM, bit1...@163.com bit1...@163.com wrote: Hi, I am experimenting Spark Streaming and Kafka Integration, To read messages from Kafka in parallel, basically there are two ways 1. Create many Receivers like (1 to 6).map(_ = KakfaUtils.createStream). 2. Specifiy many threads when calling KakfaUtils.createStream like val topicMap(myTopic=6), this will create one receiver with 6 reading threads. My question is which option is better, sounds option 2 is better is to me because it saves a lot of cores(one Receiver one core), but I learned from somewhere else that choice 1 is better, so I would ask and see how you guys elaborate on this. Thank bit1...@163.com
Re: Use case for data in SQL Server
There is a newly introduced JDBC data source in Spark 1.3.0 (not the JdbcRDD in Spark core), which may be useful. However, currently there's no SQL server specific logics implemented. I'd assume standard SQL queries should work. Cheng On 2/24/15 7:02 PM, Suhel M wrote: Hey, I am trying to work out what is the best way we can leverage Spark for crunching data that is sitting in SQL Server databases. Ideal scenario is being able to efficiently work with big data (10billion+ rows of activity data). We need to shape this data for machine learning problems and want to do ad-hoc complex queries and get results in timely manner. All our data crunching is done via SQL/MDX queries, but these obviously take a very long time to run over large data size. Also we currently don't have hadoop or any other distributed storage. Keen to hear feedback/thoughts/war stories from the Spark community on best way to approach this situation. Thanks Suhel
Re: Facing error while extending scala class with Product interface to overcome limit of 22 fields in spark-shell
Did you happen to have a look at https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema Thanks Best Regards On Tue, Feb 24, 2015 at 3:39 PM, anu anamika.guo...@gmail.com wrote: My issue is posted here on stack-overflow. What am I doing wrong here? http://stackoverflow.com/questions/28689186/facing-error-while-extending-scala-class-with-product-interface-to-overcome-limi -- View this message in context: Facing error while extending scala class with Product interface to overcome limit of 22 fields in spark-shell http://apache-spark-user-list.1001560.n3.nabble.com/Facing-error-while-extending-scala-class-with-Product-interface-to-overcome-limit-of-22-fields-in-spl-tp21787.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
New guide on how to write a Spark job in Clojure
Hi all, Maybe some of you are interested: I wrote a new guide on how to start using Spark from Clojure. The tutorial covers * setting up a project, * doing REPL- or Test Driven Development of Spark jobs * Running Spark jobs locally. Just read it on https://gorillalabs.github.io/sparkling/articles/tfidf_guide.html. Comments (and Pull requests) are very welcome. Sincerly Chris
Spark on EC2
Hi, I have just signed up for Amazon AWS because I learnt that it provides service for free for the first 12 months. I want to run Spark on EC2 cluster. Will they charge me for this? Thank You
Re: Spark on EC2
The free tier includes 750 hours of t2.micro instance time per month. http://aws.amazon.com/free/ That's basically a month of hours, so it's all free if you run one instance only at a time. If you run 4, you'll be able to run your cluster of 4 for about a week free. A t2.micro has 1GB of memory, which is small but something you could possible get work done with. However it provides only burst CPU. You can only use about 10% of 1 vCPU continuously due to capping. Imagine this as about 1/10th of 1 core on your laptop. It would be incredibly slow. This is not to mention the network and I/O bottleneck you're likely to run into as you don't get much provisioning with these free instances. So, no you really can't use this for anything that is at all CPU intensive. It's for, say, running a low-traffic web service. On Tue, Feb 24, 2015 at 2:55 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have just signed up for Amazon AWS because I learnt that it provides service for free for the first 12 months. I want to run Spark on EC2 cluster. Will they charge me for this? Thank You - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on EC2
Thank You Sean. I was just trying to experiment with the performance of Spark Applications with various worker instances (I hope you remember that we discussed about the worker instances). I thought it would be a good one to try in EC2. So, it doesn't work out, does it? Thank You On Tue, Feb 24, 2015 at 8:40 PM, Sean Owen so...@cloudera.com wrote: The free tier includes 750 hours of t2.micro instance time per month. http://aws.amazon.com/free/ That's basically a month of hours, so it's all free if you run one instance only at a time. If you run 4, you'll be able to run your cluster of 4 for about a week free. A t2.micro has 1GB of memory, which is small but something you could possible get work done with. However it provides only burst CPU. You can only use about 10% of 1 vCPU continuously due to capping. Imagine this as about 1/10th of 1 core on your laptop. It would be incredibly slow. This is not to mention the network and I/O bottleneck you're likely to run into as you don't get much provisioning with these free instances. So, no you really can't use this for anything that is at all CPU intensive. It's for, say, running a low-traffic web service. On Tue, Feb 24, 2015 at 2:55 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have just signed up for Amazon AWS because I learnt that it provides service for free for the first 12 months. I want to run Spark on EC2 cluster. Will they charge me for this? Thank You
Re: Pyspark save Decison Tree Module with joblib/pickle
Great to know, thanks Xiangrui. *Sebastián Ramírez* Diseñador de Algoritmos http://www.senseta.com Tel: (+571) 795 7950 ext: 1012 Cel: (+57) 300 370 77 10 Calle 73 No 7 - 06 Piso 4 Linkedin: co.linkedin.com/in/tiangolo/ Twitter: @tiangolo https://twitter.com/tiangolo Email: sebastian.rami...@senseta.com www.senseta.com On Tue, Feb 24, 2015 at 1:23 AM, Xiangrui Meng men...@gmail.com wrote: FYI, in 1.3 we support save/load tree models in Scala and Java. We will add save/load support to Python soon. -Xiangrui On Mon, Feb 23, 2015 at 2:57 PM, Sebastián Ramírez sebastian.rami...@senseta.com wrote: In your log it says: pickle.PicklingError: Can't pickle type 'thread.lock': it's not found as thread.lock As far as I know, you can't pickle Spark models. If you go to the documentation for Pickle you can see that you can pickle only simple Python structures and code (written in Python), at least as I understand: https://docs.python.org/2/library/pickle.html#what-can-be-pickled-and-unpickled To save a model you can call: model.toDebugString() http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.tree.DecisionTreeModel.toDebugString That gives you a string in pseudo-code that you can save to a file. Then, you can parse that pseudo code to write a proper script that runs the Decision Tree. Actually, that's what I did for a Random Forest (an ensamble of Decision Trees). Hope that helps, *Sebastián Ramírez* Diseñador de Algoritmos http://www.senseta.com Tel: (+571) 795 7950 ext: 1012 Cel: (+57) 300 370 77 10 Calle 73 No 7 - 06 Piso 4 Linkedin: co.linkedin.com/in/tiangolo/ Twitter: @tiangolo https://twitter.com/tiangolo Email: sebastian.rami...@senseta.com www.senseta.com On Mon, Feb 23, 2015 at 4:55 AM, Jaggu jagana...@gmail.com wrote: Hi Team, I was trying to save a DecisionTree model from Pyspark using joblib. It is giving me the following error http://pastebin.com/82CFhPNn . Any clue how to resolve the same or save a model. Best regards Jagan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-save-Decison-Tree-Module-with-joblib-pickle-tp21765.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org ** *This e-mail transmission, including any attachments, is intended only for the named recipient(s) and may contain information that is privileged, confidential and/or exempt from disclosure under applicable law. If you have received this transmission in error, or are not the named recipient(s), please notify Senseta immediately by return e-mail and permanently delete this transmission, including any attachments.* -- ** *This e-mail transmission, including any attachments, is intended only for the named recipient(s) and may contain information that is privileged, confidential and/or exempt from disclosure under applicable law. If you have received this transmission in error, or are not the named recipient(s), please notify Senseta immediately by return e-mail and permanently delete this transmission, including any attachments.*
Re: Spark on EC2
No, I think I am ok with the time it takes. Just that, with the increase in the partitions along with the increase in the number of workers, I want to see the improvement in the performance of an application. I just want to see this happen. Any comments? Thank You On Tue, Feb 24, 2015 at 8:52 PM, Sean Owen so...@cloudera.com wrote: You can definitely, easily, try a 1-node standalone cluster for free. Just don't be surprised when the CPU capping kicks in within about 5 minutes of any non-trivial computation and suddenly the instance is very s-l-o-w. I would consider just paying the ~$0.07/hour to play with an m3.medium, which ought to be pretty OK for basic experimentation. On Tue, Feb 24, 2015 at 3:14 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Thank You Sean. I was just trying to experiment with the performance of Spark Applications with various worker instances (I hope you remember that we discussed about the worker instances). I thought it would be a good one to try in EC2. So, it doesn't work out, does it? Thank You On Tue, Feb 24, 2015 at 8:40 PM, Sean Owen so...@cloudera.com wrote: The free tier includes 750 hours of t2.micro instance time per month. http://aws.amazon.com/free/ That's basically a month of hours, so it's all free if you run one instance only at a time. If you run 4, you'll be able to run your cluster of 4 for about a week free. A t2.micro has 1GB of memory, which is small but something you could possible get work done with. However it provides only burst CPU. You can only use about 10% of 1 vCPU continuously due to capping. Imagine this as about 1/10th of 1 core on your laptop. It would be incredibly slow. This is not to mention the network and I/O bottleneck you're likely to run into as you don't get much provisioning with these free instances. So, no you really can't use this for anything that is at all CPU intensive. It's for, say, running a low-traffic web service. On Tue, Feb 24, 2015 at 2:55 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have just signed up for Amazon AWS because I learnt that it provides service for free for the first 12 months. I want to run Spark on EC2 cluster. Will they charge me for this? Thank You
Re: Sharing Spark Drivers
Hi John, This would be a potential application for the Spark Kernel project ( https://github.com/ibm-et/spark-kernel). The Spark Kernel serves as your driver application, allowing you to feed it snippets of code (or load up entire jars via magics) in Scala to execute against a Spark cluster. Although not technically supported, you can connect multiple applications to the same Spark Kernel instance to use the same resources (both on the cluster and on the driver). If you're curious, you can find a getting started section here: https://github.com/ibm-et/spark-kernel/wiki/Getting-Started-with-the-Spark-Kernel Signed, Chip Senkbeil On Tue Feb 24 2015 at 8:04:08 AM John Omernik j...@omernik.com wrote: I have been posting on the Mesos list, as I am looking to see if it it's possible or not to share spark drivers. Obviously, in stand alone cluster mode, the Master handles requests, and you can instantiate a new sparkcontext to a currently running master. However in Mesos (and perhaps Yarn) I don't see how this is possible. I guess I am curious on why? It could make quite a bit of sense to have one driver act as a master, running as a certain user, (ideally running out in the Mesos cluster, which I believe Tim Chen is working on). That driver could belong to a user, and be used as a long term resource controlled instance that the user could use for adhoc queries. While running many little ones out on the cluster seems to be a waste of driver resources, as each driver would be using the same resources, and rarely would many be used at once (if they were for a users adhoc environment). Additionally, the advantages of the shared driver seem to play out for a user as they come back to the environment over and over again. Does this make sense? I really want to try to understand how looking at this way is wrong, either from a Spark paradigm perspective of a technological perspective. I will grant, that I am coming from a traditional background, so some of the older ideas for how to set things up may be creeping into my thinking, but if that's the case, I'd love to understand better. Thanks1 John - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on EC2
Hi, I am sorry that I made a mistake on AWS tarif. You can read the email of sean owen which explains better the strategies to run spark on AWS. For your question: it means that you just download spark and unzip it. Then run spark shell by ./bin/spark-shell or ./bin/pyspark. It is useful to get familiar with spark. You can do this on your laptop as well as on ec2. In fact, running ./ec2/spark-ec2 means launching spark standalone mode on a cluster, you can find more details here: https://spark.apache.org/docs/latest/spark-standalone.html Cheers Gen On Tue, Feb 24, 2015 at 4:07 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Kindly bear with my questions as I am new to this. If you run spark on local mode on a ec2 machine What does this mean? Is it that I launch Spark cluster from my local machine,i.e., by running the shell script that is there in /spark/ec2? On Tue, Feb 24, 2015 at 8:32 PM, gen tang gen.tan...@gmail.com wrote: Hi, As a real spark cluster needs a least one master and one slaves, you need to launch two machine. Therefore the second machine is not free. However, If you run spark on local mode on a ec2 machine. It is free. The charge of AWS depends on how much and the types of machine that you launched, but not on the utilisation of machine. Hope it would help. Cheers Gen On Tue, Feb 24, 2015 at 3:55 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have just signed up for Amazon AWS because I learnt that it provides service for free for the first 12 months. I want to run Spark on EC2 cluster. Will they charge me for this? Thank You
Re: updateStateByKey and invFunction
But how will I specify my state there? On Tue, Feb 24, 2015 at 12:50 AM Arush Kharbanda ar...@sigmoidanalytics.com wrote: You can use a reduceByKeyAndWindow with your specific time window. You can specify the inverse function in reduceByKeyAndWindow. On Tue, Feb 24, 2015 at 1:36 PM, Ashish Sharma ashishonl...@gmail.com wrote: So say I want to calculate top K users visiting a page in the past 2 hours updated every 5 mins. so here I want to maintain something like this Page_01 = {user_01:32, user_02:3, user_03:7...} ... Basically a count of number of times a user visited a page. Here my key is page name/id and state is the hashmap. Now in updateStateByKey I get the previous state and new events coming *in* the window. Is there a way to also get the events going *out* of the window? This was I can incrementally update the state over a rolling window. What is the efficient way to do it in spark streaming? Thanks Ashish -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: Spark on EC2
Thank You Akhil. Will look into it. Its free, isn't it? I am still a student :) On Tue, Feb 24, 2015 at 9:06 PM, Akhil Das ak...@sigmoidanalytics.com wrote: If you signup for Google Compute Cloud, you will get free $300 credits for 3 months and you can start a pretty good cluster for your testing purposes. :) Thanks Best Regards On Tue, Feb 24, 2015 at 8:25 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have just signed up for Amazon AWS because I learnt that it provides service for free for the first 12 months. I want to run Spark on EC2 cluster. Will they charge me for this? Thank You
Re: Use case for data in SQL Server
Hi Suhel, My team is currently working with a lot of SQL Server databases as one of our many data sources and ultimately we pull the data into HDFS from SQL Server. As we had a lot of SQL databases to hit, we used the jTDS driver and SQOOP to extract the data out of SQL Server and into HDFS (small hit against the SQL databases to extract the data out). The reasons we had done this were to 1) minimize the impact on our SQL Servers since these were transactional databases and we didn't want to our analytics queries to interfere with the transactions and 2) having the data within HDFS allowed us to centralize our relational source data within one location so we could join / mash it with other sources of data more easily. Now that the data is there, we just run our Spark queries against that and humming nicely. Saying this - I have not yet had a chance to try the Spark 1.3 JDBC data sources. Cheng, to confirm, the reference for JDBC is http://people.apache.org/~pwendell/spark-1.3.0-snapshot1-docs/api/java/org/apache/spark/sql/jdbc/package-tree.html ? In the past I have not been able to get SQL queries to against SQL Server without the use of the jTDS or Microsoft SQL Server JDBC driver for various reason (e.g. authentication, T-SQL vs. ANSI-SQL differences, etc.) If I needed to utilize an additional driver like jTDS, can I plug it in with the JDBC source and/or potentially build something that will work with the Data Sources API? Thanks! Denny On Tue Feb 24 2015 at 3:20:57 AM Cheng Lian lian.cs@gmail.com wrote: There is a newly introduced JDBC data source in Spark 1.3.0 (not the JdbcRDD in Spark core), which may be useful. However, currently there's no SQL server specific logics implemented. I'd assume standard SQL queries should work. Cheng On 2/24/15 7:02 PM, Suhel M wrote: Hey, I am trying to work out what is the best way we can leverage Spark for crunching data that is sitting in SQL Server databases. Ideal scenario is being able to efficiently work with big data (10billion+ rows of activity data). We need to shape this data for machine learning problems and want to do ad-hoc complex queries and get results in timely manner. All our data crunching is done via SQL/MDX queries, but these obviously take a very long time to run over large data size. Also we currently don't have hadoop or any other distributed storage. Keen to hear feedback/thoughts/war stories from the Spark community on best way to approach this situation. Thanks Suhel
Re: Spark on EC2
You can definitely, easily, try a 1-node standalone cluster for free. Just don't be surprised when the CPU capping kicks in within about 5 minutes of any non-trivial computation and suddenly the instance is very s-l-o-w. I would consider just paying the ~$0.07/hour to play with an m3.medium, which ought to be pretty OK for basic experimentation. On Tue, Feb 24, 2015 at 3:14 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Thank You Sean. I was just trying to experiment with the performance of Spark Applications with various worker instances (I hope you remember that we discussed about the worker instances). I thought it would be a good one to try in EC2. So, it doesn't work out, does it? Thank You On Tue, Feb 24, 2015 at 8:40 PM, Sean Owen so...@cloudera.com wrote: The free tier includes 750 hours of t2.micro instance time per month. http://aws.amazon.com/free/ That's basically a month of hours, so it's all free if you run one instance only at a time. If you run 4, you'll be able to run your cluster of 4 for about a week free. A t2.micro has 1GB of memory, which is small but something you could possible get work done with. However it provides only burst CPU. You can only use about 10% of 1 vCPU continuously due to capping. Imagine this as about 1/10th of 1 core on your laptop. It would be incredibly slow. This is not to mention the network and I/O bottleneck you're likely to run into as you don't get much provisioning with these free instances. So, no you really can't use this for anything that is at all CPU intensive. It's for, say, running a low-traffic web service. On Tue, Feb 24, 2015 at 2:55 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have just signed up for Amazon AWS because I learnt that it provides service for free for the first 12 months. I want to run Spark on EC2 cluster. Will they charge me for this? Thank You - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on EC2
This should help you understand the cost of running a Spark cluster for a short period of time: http://www.ec2instances.info/ If you run an instance for even 1 second of a single hour you are charged for that complete hour. So before you shut down your miniature cluster make sure you really are done with what you want to do, as firing up the cluster again will be like using an extra hour's worth of time. The purpose of EC2's free tier is to get you to purchase into AWS services. At the free level its not terribly useful except for the most simplest of web applications (which you could host on Heroku - also uses AWS - for free) or simple long running but largely dormant shell processes. On Tue Feb 24 2015 at 10:16:56 AM Deep Pradhan pradhandeep1...@gmail.com wrote: Thank You Sean. I was just trying to experiment with the performance of Spark Applications with various worker instances (I hope you remember that we discussed about the worker instances). I thought it would be a good one to try in EC2. So, it doesn't work out, does it? Thank You On Tue, Feb 24, 2015 at 8:40 PM, Sean Owen so...@cloudera.com wrote: The free tier includes 750 hours of t2.micro instance time per month. http://aws.amazon.com/free/ That's basically a month of hours, so it's all free if you run one instance only at a time. If you run 4, you'll be able to run your cluster of 4 for about a week free. A t2.micro has 1GB of memory, which is small but something you could possible get work done with. However it provides only burst CPU. You can only use about 10% of 1 vCPU continuously due to capping. Imagine this as about 1/10th of 1 core on your laptop. It would be incredibly slow. This is not to mention the network and I/O bottleneck you're likely to run into as you don't get much provisioning with these free instances. So, no you really can't use this for anything that is at all CPU intensive. It's for, say, running a low-traffic web service. On Tue, Feb 24, 2015 at 2:55 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have just signed up for Amazon AWS because I learnt that it provides service for free for the first 12 months. I want to run Spark on EC2 cluster. Will they charge me for this? Thank You
Re: Spark on EC2
If you signup for Google Compute Cloud, you will get free $300 credits for 3 months and you can start a pretty good cluster for your testing purposes. :) Thanks Best Regards On Tue, Feb 24, 2015 at 8:25 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have just signed up for Amazon AWS because I learnt that it provides service for free for the first 12 months. I want to run Spark on EC2 cluster. Will they charge me for this? Thank You
Re: Spark on EC2
Yes it is :) Thanks Best Regards On Tue, Feb 24, 2015 at 9:09 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Thank You Akhil. Will look into it. Its free, isn't it? I am still a student :) On Tue, Feb 24, 2015 at 9:06 PM, Akhil Das ak...@sigmoidanalytics.com wrote: If you signup for Google Compute Cloud, you will get free $300 credits for 3 months and you can start a pretty good cluster for your testing purposes. :) Thanks Best Regards On Tue, Feb 24, 2015 at 8:25 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have just signed up for Amazon AWS because I learnt that it provides service for free for the first 12 months. I want to run Spark on EC2 cluster. Will they charge me for this? Thank You
Re: Spark on EC2
Kindly bear with my questions as I am new to this. If you run spark on local mode on a ec2 machine What does this mean? Is it that I launch Spark cluster from my local machine,i.e., by running the shell script that is there in /spark/ec2? On Tue, Feb 24, 2015 at 8:32 PM, gen tang gen.tan...@gmail.com wrote: Hi, As a real spark cluster needs a least one master and one slaves, you need to launch two machine. Therefore the second machine is not free. However, If you run spark on local mode on a ec2 machine. It is free. The charge of AWS depends on how much and the types of machine that you launched, but not on the utilisation of machine. Hope it would help. Cheers Gen On Tue, Feb 24, 2015 at 3:55 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have just signed up for Amazon AWS because I learnt that it provides service for free for the first 12 months. I want to run Spark on EC2 cluster. Will they charge me for this? Thank You
Re: Spark on EC2
Hi, As a real spark cluster needs a least one master and one slaves, you need to launch two machine. Therefore the second machine is not free. However, If you run spark on local mode on a ec2 machine. It is free. The charge of AWS depends on how much and the types of machine that you launched, but not on the utilisation of machine. Hope it would help. Cheers Gen On Tue, Feb 24, 2015 at 3:55 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have just signed up for Amazon AWS because I learnt that it provides service for free for the first 12 months. I want to run Spark on EC2 cluster. Will they charge me for this? Thank You
Re: Spark on EC2
Thank You All. I think I will look into paying ~$0.7/hr as Sean suggested. On Tue, Feb 24, 2015 at 9:01 PM, gen tang gen.tan...@gmail.com wrote: Hi, I am sorry that I made a mistake on AWS tarif. You can read the email of sean owen which explains better the strategies to run spark on AWS. For your question: it means that you just download spark and unzip it. Then run spark shell by ./bin/spark-shell or ./bin/pyspark. It is useful to get familiar with spark. You can do this on your laptop as well as on ec2. In fact, running ./ec2/spark-ec2 means launching spark standalone mode on a cluster, you can find more details here: https://spark.apache.org/docs/latest/spark-standalone.html Cheers Gen On Tue, Feb 24, 2015 at 4:07 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Kindly bear with my questions as I am new to this. If you run spark on local mode on a ec2 machine What does this mean? Is it that I launch Spark cluster from my local machine,i.e., by running the shell script that is there in /spark/ec2? On Tue, Feb 24, 2015 at 8:32 PM, gen tang gen.tan...@gmail.com wrote: Hi, As a real spark cluster needs a least one master and one slaves, you need to launch two machine. Therefore the second machine is not free. However, If you run spark on local mode on a ec2 machine. It is free. The charge of AWS depends on how much and the types of machine that you launched, but not on the utilisation of machine. Hope it would help. Cheers Gen On Tue, Feb 24, 2015 at 3:55 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, I have just signed up for Amazon AWS because I learnt that it provides service for free for the first 12 months. I want to run Spark on EC2 cluster. Will they charge me for this? Thank You
Re: Accumulator in SparkUI for streaming
Interesting. Accumulators are shown on Web UI if you are using the ordinary SparkContext (Spark 1.2). It just has to be named (and that's what you did). scala val acc = sc.accumulator(0, test accumulator) acc: org.apache.spark.Accumulator[Int] = 0 scala val rdd = sc.parallelize(1 to 1000) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at console:12 scala rdd.foreach(x = acc += 1) scala acc.value res1: Int = 1000 The Stage details page shows: On 20.2.2015. 9:25, Tim Smith wrote: On Spark 1.2: I am trying to capture # records read from a kafka topic: val inRecords = ssc.sparkContext.accumulator(0, InRecords) .. kInStreams.foreach( k = { k.foreachRDD ( rdd = inRecords += rdd.count().toInt ) inRecords.value Question is how do I get the accumulator to show up in the UI? I tried inRecords.value but that didn't help. Pretty sure it isn't showing up in Stage metrics. What's the trick here? collect? Thanks, Tim
Re: Memory problems when calling pipe()
Hi, I finally solved the problem by setting spark.yarn.executor.memoryOverhead with the option --conf spark.yarn.executor.memoryOverhead= for spark-submit, as pointed out in http://stackoverflow.com/questions/28404714/yarn-why-doesnt-task-go-out-of-heap-space-but-container-gets-killed and https://issues.apache.org/jira/browse/SPARK-2444, and now it works ok. Greetings, Juan 2015-02-23 10:40 GMT+01:00 Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com: Hi, I'm having problems using pipe() from a Spark program written in Java, where I call a python script, running in a YARN cluster. The problem is that the job fails when YARN kills the container because the python script is going beyond the memory limits. I get something like this in the log: 01_04. Exit status: 143. Diagnostics: Container [pid=6976,containerID=container_1424279690678_0078_01_04] is running beyond physical memory limits. Current usage: 7.5 GB of 7.5 GB physical memory used; 8.6 GB of 23.3 GB virtual memory used. Killing container. Dump of the process-tree for container_1424279690678_0078_01_04 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 6976 1457 6976 6976 (bash) 0 0 108613632 338 /bin/bash -c /usr/java/jdk1.7.0_71/bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms7048m -Xmx7048m -Djava.io.tmpdir=/mnt/data1/hadoop/yarn/local/usercache/root/appcache/application_1424279690678_0078/container_1424279690678_0078_01_04/tmp '-Dspark.driver.port=33589' '-Dspark.ui.port=0' -Dspark.yarn.app.container.log.dir=/mnt/data1/hadoop/yarn/log/application_1424279690678_0078/container_1424279690678_0078_01_04 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp:// sparkdri...@slave3.lambdoop.com:33589/user/CoarseGrainedScheduler 5 slave1.lambdoop.com 1 application_1424279690678_0078 1 /mnt/data1/hadoop/yarn/log/application_1424279690678_0078/container_1424279690678_0078_01_04/stdout 2 /mnt/data1/hadoop/yarn/log/application_1424279690678_0078/container_1424279690678_0078_01_04/stderr |- 10513 6982 6976 6976 (python2.7) 9308 1224 448360448 13857 /usr/local/bin/python2.7 /mnt/my_script.py my_args |- 6982 6976 6976 6976 (java) 115176 12032 8632229888 1951974 /usr/java/jdk1.7.0_71/bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms7048m -Xmx7048m -Djava.io.tmpdir=/mnt/data1/hadoop/yarn/local/usercache/root/appcache/application_1424279690678_0078/container_1424279690678_0078_01_04/tmp -Dspark.driver.port=33589 -Dspark.ui.port=0 -Dspark.yarn.app.container.log.dir=/mnt/data1/hadoop/yarn/log/application_1424279690678_0078/container_1424279690678_0078_01_04 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp:// sparkdri...@slave3.lambdoop.com:33589/user/CoarseGrainedScheduler 5 slave1.lambdoop.com 1 application_1424279690678_0078 Container killed on request. Exit code is 143 Container exited with a non-zero exit code 143 I find this strange because the python script process each input line separately, and makes a simple independent calculation per line: it basically parses the line, calcules the Haversine distance, and returns a double value. Input lines are traversed in python with a loop for line in sys.stdin. Also to avoid memory leaks in Python: - I call sys.stdout.flush() per each output line generated by python. - I call the following function after writing each output line, to force garbage collection regularly in Python: _iterations_until_gc = 1000 iterations_since_gc = 0 def update_garbage_collector(): global iterations_since_gc if iterations_since_gc = _iterations_until_gc: gc.collect() iterations_since_gc = 0 else: iterations_since_gc += 1 So the memory consumption of the script should be constant, but in practice it looks like there is some memory leak, maybe Spark is introducing some memory leak when redirecting the IO in pipe()? Has any of you experienced similar situations when using pipe in Spark? Also, do you know how could I control the amount of memory reserved for the subprocess that is created by pipe. I understand than with --executor-memory I set the memory for the Spark executor process, but not for the subprocess created by pipe. Thanks in advance for your help. Greetings, Juan
Sharing Spark Drivers
I have been posting on the Mesos list, as I am looking to see if it it's possible or not to share spark drivers. Obviously, in stand alone cluster mode, the Master handles requests, and you can instantiate a new sparkcontext to a currently running master. However in Mesos (and perhaps Yarn) I don't see how this is possible. I guess I am curious on why? It could make quite a bit of sense to have one driver act as a master, running as a certain user, (ideally running out in the Mesos cluster, which I believe Tim Chen is working on). That driver could belong to a user, and be used as a long term resource controlled instance that the user could use for adhoc queries. While running many little ones out on the cluster seems to be a waste of driver resources, as each driver would be using the same resources, and rarely would many be used at once (if they were for a users adhoc environment). Additionally, the advantages of the shared driver seem to play out for a user as they come back to the environment over and over again. Does this make sense? I really want to try to understand how looking at this way is wrong, either from a Spark paradigm perspective of a technological perspective. I will grant, that I am coming from a traditional background, so some of the older ideas for how to set things up may be creeping into my thinking, but if that's the case, I'd love to understand better. Thanks1 John - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.3 dataframe documentation
Hello, I have built Spark 1.3. I can successfully use the dataframe api. However, I am not able to find its api documentation in Python. Do you know when the documentation will be available? Best Regards, poiuytrez -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-dataframe-documentation-tp21789.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Get filename in Spark Streaming
Hello Subacini, Until someone more knowledgeable suggests a better, more straightforward, and simpler approach with a working code snippet, I suggest the following workaround / hack: inputStream.foreachRDD(rdd = val myStr = rdd.toDebugString // process myStr string value, e.g. using regular expressions ) For example if you print myStr, you can see in your log / consol output somehing similar to: 15/02/24 15:14:56 INFO FileInputFormat: Total input paths to process : 1 15/02/24 15:14:56 INFO JobScheduler: Added jobs for time 1424787295000 ms 15/02/24 15:14:56 INFO JobScheduler: Starting job streaming job 1424787295000 ms.0 from job set of time 1424787295000 ms (20) MappedRDD[27] at textFileStream at kmeans.scala:17 [] | UnionRDD[26] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL14.json NewHadoopRDD[6] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL11.json NewHadoopRDD[7] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL10.json NewHadoopRDD[8] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL6.json NewHadoopRDD[9] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL8.json NewHadoopRDD[10] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL5.json NewHadoopRDD[11] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL1.json NewHadoopRDD[12] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL9.json NewHadoopRDD[13] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL2.json NewHadoopRDD[14] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL16.json NewHadoopRDD[15] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL20.json NewHadoopRDD[16] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL12.json NewHadoopRDD[17] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL4.json NewHadoopRDD[18] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL19.json NewHadoopRDD[19] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL7.json NewHadoopRDD[20] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL17.json NewHadoopRDD[21] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL18.json NewHadoopRDD[22] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL3.json NewHadoopRDD[23] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL13.json NewHadoopRDD[24] at textFileStream at kmeans.scala:17 [] | file:/home/emre/data/train/newsMessageNL15.json NewHadoopRDD[25] at textFileStream at kmeans.scala:17 [] 15/02/24 15:14:56 INFO JobScheduler: Finished job streaming job 1424787295000 ms.0 from job set of time 1424787295000 ms 15/02/24 15:14:56 INFO JobScheduler: Total delay: 1.420 s for time 1424787295000 ms (execution: 0.051 s) 15/02/24 15:14:56 INFO MappedRDD: Removing RDD 5 from persistence list 15/02/24 15:14:56 INFO BlockManager: Removing RDD 5 15/02/24 15:14:56 INFO FileInputDStream: Cleared 0 old files that were older than 1424787235000 ms: 15/02/24 15:14:56 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer() You can process the string to retrieve each section that starts with file: and ends with a space. Then for each such string you can get your timestamp from the file name. -- Emre Sevinç http://www.bigindustries.be/ On Fri, Feb 6, 2015 at 9:33 PM, Subacini B subac...@gmail.com wrote: Thank you Emre, This helps, i am able to get filename. But i am not sure how to fit this into Dstream RDD. val inputStream = ssc.textFileStream(/hdfs Path/) inputStream is Dstreamrdd and in foreachrdd , am doing my processing inputStream.foreachRDD(rdd = { * //how to get filename here??* }) Can you please help. On Thu, Feb 5, 2015 at 11:15 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, Did you check the following? http://themodernlife.github.io/scala/spark/hadoop/hdfs/2014/09/28/spark-input-filename/ http://apache-spark-user-list.1001560.n3.nabble.com/access-hdfs-file-name-in-map-td6551.html -- Emre Sevinç On Fri, Feb 6, 2015 at 2:16 AM, Subacini B subac...@gmail.com wrote: Hi All, We have filename with timestamp say ABC_1421893256000.txt and the timestamp needs to be extracted from file name for further processing.Is there a way to get input file name picked up by spark streaming job? Thanks in advance Subacini -- Emre Sevinc -- Emre Sevinc
Re: Executor size and checkpoints
Tathagata, yes, I was using StreamingContext.getOrCreate. My question is about the design decision here. I was expecting that if I have a streaming application that say crashed, and I wanted to give the executors more memory, I would be able to restart, using the checkpointed RDD but with more memory. I thought deleting the checkpoints in a checkpointed application is the last thing that you want to do (as you lose all state). Seems a bit harsh to have to do this just to increase the amount of memory? On Mon, Feb 23, 2015 at 11:12 PM, Tathagata Das t...@databricks.com wrote: Hey Yana, I think you posted screenshots, but they are not visible in the email. Probably better to upload them and post links. Are you using StreamingContext.getOrCreate? If that is being used, then it will recreate the SparkContext with SparkConf having whatever configuration is present in the existing checkpoint files. It may so happen that the existing checkpoint files were from an old run which had 512 configured. So the SparkConf in the restarted SparkContext/StremingContext is accidentally picking up the old configuration. Deleting the checkpoint files avoided a restart, and the new config took affect. Maybe. :) TD On Sat, Feb 21, 2015 at 7:30 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi all, I had a streaming application and midway through things decided to up the executor memory. I spent a long time launching like this: ~/spark-1.2.0-bin-cdh4/bin/spark-submit --class StreamingTest --executor-memory 2G --master... and observing the executor memory is still at old 512 setting I was about to ask if this is a bug when I decided to delete the checkpoints. Sure enough the setting took after that. So my question is -- why is it required to remove checkpoints to increase memory allowed on an executor? This seems pretty un-intuitive to me. Thanks for any insights.
Running out of space (when there's no shortage)
I'm running a cluster of 3 Amazon EC2 machines (small number because it's expensive when experiments keep crashing after a day!). Today's crash looks like this (stacktrace at end of message). org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 On my three nodes, I have plenty of space and inodes: A $ df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 97937 426351 19% / tmpfs1909200 1 19091991% /dev/shm /dev/xvdb2457600 54 24575461% /mnt /dev/xvdc2457600 24 24575761% /mnt2 /dev/xvds831869296 23844 8318454521% /vol0 A $ df -h FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 3.4G 4.5G 44% / tmpfs 7.3G 0 7.3G 0% /dev/shm /dev/xvdb 37G 1.2G 34G 4% /mnt /dev/xvdc 37G 177M 35G 1% /mnt2 /dev/xvds1000G 802G 199G 81% /vol0 B $ df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 97947 426341 19% / tmpfs1906639 1 19066381% /dev/shm /dev/xvdb2457600 54 24575461% /mnt /dev/xvdc2457600 24 24575761% /mnt2 /dev/xvds816200704 24223 8161764811% /vol0 B $ df -h FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 3.6G 4.3G 46% / tmpfs 7.3G 0 7.3G 0% /dev/shm /dev/xvdb 37G 1.2G 34G 4% /mnt /dev/xvdc 37G 177M 35G 1% /mnt2 /dev/xvds1000G 805G 195G 81% /vol0 C $df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 97938 426350 19% / tmpfs1906897 1 19068961% /dev/shm /dev/xvdb2457600 54 24575461% /mnt /dev/xvdc2457600 24 24575761% /mnt2 /dev/xvds755218352 24024 7551943281% /vol0 root@ip-10-204-136-223 ~]$ C $ df -h FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 3.4G 4.5G 44% / tmpfs 7.3G 0 7.3G 0% /dev/shm /dev/xvdb 37G 1.2G 34G 4% /mnt /dev/xvdc 37G 177M 35G 1% /mnt2 /dev/xvds1000G 820G 181G 82% /vol0 The devices may be ~80% full but that still leaves ~200G free on each. My spark-env.sh has export SPARK_LOCAL_DIRS=/vol0/spark I have manually verified that on each slave the only temporary files are stored on /vol0, all looking something like this /vol0/spark/spark-f05d407c/spark-fca3e573/spark-78c06215/spark-4f0c4236/20/rdd_8_884 So it looks like all the files are being stored on the large drives (incidentally they're AWS EBS volumes, but that's the only way to get enough storage). My process crashed before with a slightly different exception under the same circumstances: kryo.KryoException: java.io.IOException: No space left on device These both happen after several hours and several GB of temporary files. Why does Spark think it's run out of space? TIA Joe Stack trace 1: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:109) at
RE: Union and reduceByKey will trigger shuffle even same partition?
Hi Imran, I will say your explanation is extremely helpful J I tested some ideas according to your explanation and it make perfect sense to me. I modify my code to use cogroup+mapValues instead of union+reduceByKey to preserve the partition, which gives me more than 100% performance gain (for the loop part). Thanks a lot! And I am curious will there any easy way for me to get a detail DAG execution plan description without running the code? Just as explain command in pig or sql? Shuai From: Imran Rashid [mailto:iras...@cloudera.com] Sent: Monday, February 23, 2015 6:00 PM To: Shuai Zheng Cc: Shao, Saisai; user@spark.apache.org Subject: Re: Union and reduceByKey will trigger shuffle even same partition? I think you're getting tripped up lazy evaluation and the way stage boundaries work (admittedly its pretty confusing in this case). It is true that up until recently, if you unioned two RDDs with the same partitioner, the result did not have the same partitioner. But that was just fixed here: https://github.com/apache/spark/pull/4629 That does mean that after you update ranks, it will no longer have a partitioner, which will effect the join on your second iteration here: val contributions = links.join(ranks).flatMap But, I think most of the shuffles you are pointing to are a different issue. I may be belaboring something you already know, but I think this is easily confusing. I think the first thing is understanding where you get stage boundaries, and how they are named. Each shuffle introduces a stage boundary. However, the stages get named by the last thing in a stage, which is not really what is always causing the shuffle. Eg., reduceByKey() causes a shuffle, but we don't see that in a stage name. Similarly, map() does not cause a shuffle, but we see a stage with that name. So, what do the stage boundaries we see actually correspond to? 1) map -- that is doing the shuffle write for the following groupByKey 2) groupByKey -- in addition to reading the shuffle output from your map, this is *also* doing the shuffle write for the next shuffle you introduce w/ partitionBy 3) union -- this is doing the shuffle reading from your partitionBy, and then all the work from there right up until the shuffle write for what is immediatley after union -- your reduceByKey. 4) lookup is an action, which is why that has another stage. a couple of things to note: (a) your join does not cause a shuffle, b/c both rdds share a partitioner (b) you have two shuffles from groupByKey followed by partitionBy -- you really probably want the 1 arg form of groupByKey(partitioner) hopefully this is helpful to understand how your stages shuffles correspond to your code. Imran On Mon, Feb 23, 2015 at 3:35 PM, Shuai Zheng szheng.c...@gmail.com wrote: This also trigger an interesting question: how can I do this locally by code if I want. For example: I have RDD A and B, which has some partition, then if I want to join A to B, I might just want to do a mapper side join (although B itself might be big, but B’s local partition is known small enough put in memory), how can I access other RDD’s local partition in the mapParitition method? Is it anyway to do this in Spark? From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: Monday, February 23, 2015 3:13 PM To: Shuai Zheng Cc: user@spark.apache.org Subject: RE: Union and reduceByKey will trigger shuffle even same partition? If you call reduceByKey(), internally Spark will introduce a shuffle operations, not matter the data is already partitioned locally, Spark itself do not know the data is already well partitioned. So if you want to avoid Shuffle, you have to write the code explicitly to avoid this, from my understanding. You can call mapParitition to get a partition of data and reduce by key locally by your logic. Thanks Saisai From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Monday, February 23, 2015 12:00 PM To: user@spark.apache.org Subject: Union and reduceByKey will trigger shuffle even same partition? Hi All, I am running a simple page rank program, but it is slow. And I dig out part of reason is there is shuffle happen when I call an union action even both RDD share the same partition: Below is my test code in spark shell: import org.apache.spark.HashPartitioner sc.getConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) val beta = 0.8 val numOfPartition = 6 val links = sc.textFile(c:/Download/web-Google.txt).filter(!_.contains(#)).map(line={val part=line.split(\t); (part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new HashPartitioner(numOfPartition)).persist var ranks = links.mapValues(_ = 1.0) var leakedMatrix = links.mapValues(_ = (1.0-beta)).persist for (i - 1 until 2) { val contributions = links.join(ranks).flatMap { case (pageId, (links, rank)) =
Re: How to get yarn logs to display in the spark or yarn history-server?
Hi Colin, Here is how I have configured my hadoop cluster to have yarn logs available through both the yarn CLI and the _yarn_ history server (with gzip compression and 10 days retention): 1. Add the following properties in the yarn-site.xml on each node managers and on the resource manager: property nameyarn.log-aggregation-enable/name valuetrue/value /property property nameyarn.log-aggregation.retain-seconds/name value864000/value /property property nameyarn.log.server.url/name valuehttp://dc1-kdp-dev-hadoop-03.dev.dc1.kelkoo.net:19888/jobhistory/logs/value /property property nameyarn.nodemanager.log-aggregation.compression-type/name valuegz/value /property 2. Restart yarn and then start the yarn history server on the server defined in the yarn.log.server.url property above: /opt/hadoop/sbin/mr-jobhistory-daemon.sh stop historyserver # should fail if historyserver is not yet started /opt/hadoop/sbin/stop-yarn.sh /opt/hadoop/sbin/start-yarn.sh /opt/hadoop/sbin/mr-jobhistory-daemon.sh start historyserver It may be slightly different for you if the resource manager and the history server are not on the same machine. Hope it will work for you as well! Christophe. On 24/02/2015 06:31, Colin Kincaid Williams wrote: Hi, I have been trying to get my yarn logs to display in the spark history-server or yarn history-server. I can see the log information yarn logs -applicationId application_1424740955620_0009 15/02/23 22:15:14 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to us3sm2hbqa04r07-comp-prod-local Container: container_1424740955620_0009_01_02 on us3sm2hbqa07r07.comp.prod.local_8041 === LogType: stderr LogLength: 0 Log Contents: LogType: stdout LogLength: 897 Log Contents: [GC [PSYoungGen: 262656K-23808K(306176K)] 262656K-23880K(1005568K), 0.0283450 secs] [Times: user=0.14 sys=0.03, real=0.03 secs] Heap PSYoungGen total 306176K, used 111279K [0xeaa8, 0x0001, 0x0001) eden space 262656K, 33% used [0xeaa8,0xeffebbe0,0xfab0) from space 43520K, 54% used [0xfab0,0xfc240320,0xfd58) to space 43520K, 0% used [0xfd58,0xfd58,0x0001) ParOldGen total 699392K, used 72K [0xbff8, 0xeaa8, 0xeaa8) object space 699392K, 0% used [0xbff8,0xbff92010,0xeaa8) PSPermGen total 35328K, used 34892K [0xbad8, 0xbd00, 0xbff8) object space 35328K, 98% used [0xbad8,0xbcf93088,0xbd00) Container: container_1424740955620_0009_01_03 on us3sm2hbqa09r09.comp.prod.local_8041 === LogType: stderr LogLength: 0 Log Contents: LogType: stdout LogLength: 896 Log Contents: [GC [PSYoungGen: 262656K-23725K(306176K)] 262656K-23797K(1005568K), 0.0358650 secs] [Times: user=0.28 sys=0.04, real=0.04 secs] Heap PSYoungGen total 306176K, used 65712K [0xeaa8, 0x0001, 0x0001) eden space 262656K, 15% used [0xeaa8,0xed380bf8,0xfab0) from space 43520K, 54% used [0xfab0,0xfc22b4f8,0xfd58) to space 43520K, 0% used [0xfd58,0xfd58,0x0001) ParOldGen total 699392K, used 72K [0xbff8, 0xeaa8, 0xeaa8) object space 699392K, 0% used [0xbff8,0xbff92010,0xeaa8) PSPermGen total 29696K, used 29486K [0xbad8, 0xbca8, 0xbff8) object space 29696K, 99% used [0xbad8,0xbca4b838,0xbca8) Container: container_1424740955620_0009_01_01 on us3sm2hbqa09r09.comp.prod.local_8041 === LogType: stderr LogLength: 0 Log Contents: LogType: stdout LogLength: 21 Log Contents: Pi is roughly 3.1416 I can see some details for the application in the spark history-server at this url http://us3sm2hbqa04r07.comp.prod.local:18080/history/application_1424740955620_0009/jobs/ . When running in spark-master mode, I can see the stdout and stderror somewhere in the spark history-server. Then how do I get the information which I see above into the Spark history-server ? Kelkoo SAS Société par Actions Simplifiée Au capital de € 4.168.964,30 Siège social : 158 Ter Rue du Temple 75003 Paris 425 093 069 RCS Paris Ce message et les pièces jointes sont confidentiels et établis à l'attention exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de
Re: How to get yarn logs to display in the spark or yarn history-server?
Looks like in my tired state, I didn't mention spark the whole time. However, it might be implied by the application log above. Spark log aggregation appears to be working, since I can run the yarn command above. I do have yarn logging setup for the yarn history server. I was trying to use the spark history-server, but maybe I should try setting spark.yarn.historyServer.address to the yarn history-server, instead of the spark history-server? I tried this configuration when I started, but didn't have much luck. Are you getting your spark apps run in yarn client or cluster mode in your yarn history server? If so can you share any spark settings? On Tue, Feb 24, 2015 at 8:48 AM, Christophe Préaud christophe.pre...@kelkoo.com wrote: Hi Colin, Here is how I have configured my hadoop cluster to have yarn logs available through both the yarn CLI and the _yarn_ history server (with gzip compression and 10 days retention): 1. Add the following properties in the yarn-site.xml on each node managers and on the resource manager: property nameyarn.log-aggregation-enable/name valuetrue/value /property property nameyarn.log-aggregation.retain-seconds/name value864000/value /property property nameyarn.log.server.url/name value http://dc1-kdp-dev-hadoop-03.dev.dc1.kelkoo.net:19888/jobhistory/logs /value /property property nameyarn.nodemanager.log-aggregation.compression-type/name valuegz/value /property 2. Restart yarn and then start the yarn history server on the server defined in the yarn.log.server.url property above: /opt/hadoop/sbin/mr-jobhistory-daemon.sh stop historyserver # should fail if historyserver is not yet started /opt/hadoop/sbin/stop-yarn.sh /opt/hadoop/sbin/start-yarn.sh /opt/hadoop/sbin/mr-jobhistory-daemon.sh start historyserver It may be slightly different for you if the resource manager and the history server are not on the same machine. Hope it will work for you as well! Christophe. On 24/02/2015 06:31, Colin Kincaid Williams wrote: Hi, I have been trying to get my yarn logs to display in the spark history-server or yarn history-server. I can see the log information yarn logs -applicationId application_1424740955620_0009 15/02/23 22:15:14 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to us3sm2hbqa04r07-comp-prod-local Container: container_1424740955620_0009_01_02 on us3sm2hbqa07r07.comp.prod.local_8041 === LogType: stderr LogLength: 0 Log Contents: LogType: stdout LogLength: 897 Log Contents: [GC [PSYoungGen: 262656K-23808K(306176K)] 262656K-23880K(1005568K), 0.0283450 secs] [Times: user=0.14 sys=0.03, real=0.03 secs] Heap PSYoungGen total 306176K, used 111279K [0xeaa8, 0x0001, 0x0001) eden space 262656K, 33% used [0xeaa8,0xeffebbe0,0xfab0) from space 43520K, 54% used [0xfab0,0xfc240320,0xfd58) to space 43520K, 0% used [0xfd58,0xfd58,0x0001) ParOldGen total 699392K, used 72K [0xbff8, 0xeaa8, 0xeaa8) object space 699392K, 0% used [0xbff8,0xbff92010,0xeaa8) PSPermGen total 35328K, used 34892K [0xbad8, 0xbd00, 0xbff8) object space 35328K, 98% used [0xbad8,0xbcf93088,0xbd00) Container: container_1424740955620_0009_01_03 on us3sm2hbqa09r09.comp.prod.local_8041 === LogType: stderr LogLength: 0 Log Contents: LogType: stdout LogLength: 896 Log Contents: [GC [PSYoungGen: 262656K-23725K(306176K)] 262656K-23797K(1005568K), 0.0358650 secs] [Times: user=0.28 sys=0.04, real=0.04 secs] Heap PSYoungGen total 306176K, used 65712K [0xeaa8, 0x0001, 0x0001) eden space 262656K, 15% used [0xeaa8,0xed380bf8,0xfab0) from space 43520K, 54% used [0xfab0,0xfc22b4f8,0xfd58) to space 43520K, 0% used [0xfd58,0xfd58,0x0001) ParOldGen total 699392K, used 72K [0xbff8, 0xeaa8, 0xeaa8) object space 699392K, 0% used [0xbff8,0xbff92010,0xeaa8) PSPermGen total 29696K, used 29486K [0xbad8, 0xbca8, 0xbff8) object space 29696K, 99% used [0xbad8,0xbca4b838,0xbca8) Container: container_1424740955620_0009_01_01 on us3sm2hbqa09r09.comp.prod.local_8041
Re: How to get yarn logs to display in the spark or yarn history-server?
the spark history server and the yarn history server are totally independent. Spark knows nothing about yarn logs, and vice versa, so unfortunately there isn't any way to get all the info in one place. On Tue, Feb 24, 2015 at 12:36 PM, Colin Kincaid Williams disc...@uw.edu wrote: Looks like in my tired state, I didn't mention spark the whole time. However, it might be implied by the application log above. Spark log aggregation appears to be working, since I can run the yarn command above. I do have yarn logging setup for the yarn history server. I was trying to use the spark history-server, but maybe I should try setting spark.yarn.historyServer.address to the yarn history-server, instead of the spark history-server? I tried this configuration when I started, but didn't have much luck. Are you getting your spark apps run in yarn client or cluster mode in your yarn history server? If so can you share any spark settings? On Tue, Feb 24, 2015 at 8:48 AM, Christophe Préaud christophe.pre...@kelkoo.com wrote: Hi Colin, Here is how I have configured my hadoop cluster to have yarn logs available through both the yarn CLI and the _yarn_ history server (with gzip compression and 10 days retention): 1. Add the following properties in the yarn-site.xml on each node managers and on the resource manager: property nameyarn.log-aggregation-enable/name valuetrue/value /property property nameyarn.log-aggregation.retain-seconds/name value864000/value /property property nameyarn.log.server.url/name value http://dc1-kdp-dev-hadoop-03.dev.dc1.kelkoo.net:19888/jobhistory/logs /value /property property nameyarn.nodemanager.log-aggregation.compression-type/name valuegz/value /property 2. Restart yarn and then start the yarn history server on the server defined in the yarn.log.server.url property above: /opt/hadoop/sbin/mr-jobhistory-daemon.sh stop historyserver # should fail if historyserver is not yet started /opt/hadoop/sbin/stop-yarn.sh /opt/hadoop/sbin/start-yarn.sh /opt/hadoop/sbin/mr-jobhistory-daemon.sh start historyserver It may be slightly different for you if the resource manager and the history server are not on the same machine. Hope it will work for you as well! Christophe. On 24/02/2015 06:31, Colin Kincaid Williams wrote: Hi, I have been trying to get my yarn logs to display in the spark history-server or yarn history-server. I can see the log information yarn logs -applicationId application_1424740955620_0009 15/02/23 22:15:14 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to us3sm2hbqa04r07-comp-prod-local Container: container_1424740955620_0009_01_02 on us3sm2hbqa07r07.comp.prod.local_8041 === LogType: stderr LogLength: 0 Log Contents: LogType: stdout LogLength: 897 Log Contents: [GC [PSYoungGen: 262656K-23808K(306176K)] 262656K-23880K(1005568K), 0.0283450 secs] [Times: user=0.14 sys=0.03, real=0.03 secs] Heap PSYoungGen total 306176K, used 111279K [0xeaa8, 0x0001, 0x0001) eden space 262656K, 33% used [0xeaa8,0xeffebbe0,0xfab0) from space 43520K, 54% used [0xfab0,0xfc240320,0xfd58) to space 43520K, 0% used [0xfd58,0xfd58,0x0001) ParOldGen total 699392K, used 72K [0xbff8, 0xeaa8, 0xeaa8) object space 699392K, 0% used [0xbff8,0xbff92010,0xeaa8) PSPermGen total 35328K, used 34892K [0xbad8, 0xbd00, 0xbff8) object space 35328K, 98% used [0xbad8,0xbcf93088,0xbd00) Container: container_1424740955620_0009_01_03 on us3sm2hbqa09r09.comp.prod.local_8041 === LogType: stderr LogLength: 0 Log Contents: LogType: stdout LogLength: 896 Log Contents: [GC [PSYoungGen: 262656K-23725K(306176K)] 262656K-23797K(1005568K), 0.0358650 secs] [Times: user=0.28 sys=0.04, real=0.04 secs] Heap PSYoungGen total 306176K, used 65712K [0xeaa8, 0x0001, 0x0001) eden space 262656K, 15% used [0xeaa8,0xed380bf8,0xfab0) from space 43520K, 54% used [0xfab0,0xfc22b4f8,0xfd58) to space 43520K, 0% used [0xfd58,0xfd58,0x0001) ParOldGen total 699392K, used 72K [0xbff8, 0xeaa8, 0xeaa8) object space 699392K, 0% used [0xbff8,0xbff92010,0xeaa8) PSPermGen total 29696K, used 29486K [0xbad8,
Re: How to get yarn logs to display in the spark or yarn history-server?
So back to my original question. I can see the spark logs using the example above: yarn logs -applicationId application_1424740955620_0009 This shows yarn log aggregation working. I can see the std out and std error in that container information above. Then how can I get this information in a web-ui ? Is this not currently supported? On Tue, Feb 24, 2015 at 10:44 AM, Imran Rashid iras...@cloudera.com wrote: the spark history server and the yarn history server are totally independent. Spark knows nothing about yarn logs, and vice versa, so unfortunately there isn't any way to get all the info in one place. On Tue, Feb 24, 2015 at 12:36 PM, Colin Kincaid Williams disc...@uw.edu wrote: Looks like in my tired state, I didn't mention spark the whole time. However, it might be implied by the application log above. Spark log aggregation appears to be working, since I can run the yarn command above. I do have yarn logging setup for the yarn history server. I was trying to use the spark history-server, but maybe I should try setting spark.yarn.historyServer.address to the yarn history-server, instead of the spark history-server? I tried this configuration when I started, but didn't have much luck. Are you getting your spark apps run in yarn client or cluster mode in your yarn history server? If so can you share any spark settings? On Tue, Feb 24, 2015 at 8:48 AM, Christophe Préaud christophe.pre...@kelkoo.com wrote: Hi Colin, Here is how I have configured my hadoop cluster to have yarn logs available through both the yarn CLI and the _yarn_ history server (with gzip compression and 10 days retention): 1. Add the following properties in the yarn-site.xml on each node managers and on the resource manager: property nameyarn.log-aggregation-enable/name valuetrue/value /property property nameyarn.log-aggregation.retain-seconds/name value864000/value /property property nameyarn.log.server.url/name value http://dc1-kdp-dev-hadoop-03.dev.dc1.kelkoo.net:19888/jobhistory/logs /value /property property nameyarn.nodemanager.log-aggregation.compression-type/name valuegz/value /property 2. Restart yarn and then start the yarn history server on the server defined in the yarn.log.server.url property above: /opt/hadoop/sbin/mr-jobhistory-daemon.sh stop historyserver # should fail if historyserver is not yet started /opt/hadoop/sbin/stop-yarn.sh /opt/hadoop/sbin/start-yarn.sh /opt/hadoop/sbin/mr-jobhistory-daemon.sh start historyserver It may be slightly different for you if the resource manager and the history server are not on the same machine. Hope it will work for you as well! Christophe. On 24/02/2015 06:31, Colin Kincaid Williams wrote: Hi, I have been trying to get my yarn logs to display in the spark history-server or yarn history-server. I can see the log information yarn logs -applicationId application_1424740955620_0009 15/02/23 22:15:14 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to us3sm2hbqa04r07-comp-prod-local Container: container_1424740955620_0009_01_02 on us3sm2hbqa07r07.comp.prod.local_8041 === LogType: stderr LogLength: 0 Log Contents: LogType: stdout LogLength: 897 Log Contents: [GC [PSYoungGen: 262656K-23808K(306176K)] 262656K-23880K(1005568K), 0.0283450 secs] [Times: user=0.14 sys=0.03, real=0.03 secs] Heap PSYoungGen total 306176K, used 111279K [0xeaa8, 0x0001, 0x0001) eden space 262656K, 33% used [0xeaa8,0xeffebbe0,0xfab0) from space 43520K, 54% used [0xfab0,0xfc240320,0xfd58) to space 43520K, 0% used [0xfd58,0xfd58,0x0001) ParOldGen total 699392K, used 72K [0xbff8, 0xeaa8, 0xeaa8) object space 699392K, 0% used [0xbff8,0xbff92010,0xeaa8) PSPermGen total 35328K, used 34892K [0xbad8, 0xbd00, 0xbff8) object space 35328K, 98% used [0xbad8,0xbcf93088,0xbd00) Container: container_1424740955620_0009_01_03 on us3sm2hbqa09r09.comp.prod.local_8041 === LogType: stderr LogLength: 0 Log Contents: LogType: stdout LogLength: 896 Log Contents: [GC [PSYoungGen: 262656K-23725K(306176K)] 262656K-23797K(1005568K), 0.0358650 secs] [Times: user=0.28 sys=0.04, real=0.04 secs] Heap PSYoungGen total 306176K, used 65712K [0xeaa8, 0x0001, 0x0001) eden space 262656K, 15% used [0xeaa8,0xed380bf8,0xfab0) from space 43520K, 54% used
Re: Brodcast Variable updated from one transformation and used from another
Sorry for the mistake, I actually have it this way: val myObject = new MyObject(); val myObjectBroadcasted = sc.broadcast(myObject); val rdd1 = sc.textFile(/file1).map(e = { myObjectBroadcasted.value.insert(e._1); (e._1,1) }); rdd.cache.count(); //to make sure it is transformed. val rdd2 = sc.textFile(/file2).map(e = { val lookedUp = myObjectBroadcasted.value.lookup(e._1); (e._1, lookedUp) }); On 24 February 2015 at 17:36, Ganelin, Ilya ilya.gane...@capitalone.com wrote: You're not using the broadcasted variable within your map operations. You're attempting to modify myObjrct directly which won't work because you are modifying the serialized copy on the executor. You want to do myObjectBroadcasted.value.insert and myObjectBroadcasted.value.lookup. Sent with Good (www.good.com) -Original Message- *From: *Yiannis Gkoufas [johngou...@gmail.com] *Sent: *Tuesday, February 24, 2015 12:12 PM Eastern Standard Time *To: *user@spark.apache.org *Subject: *Brodcast Variable updated from one transformation and used from another Hi all, I am trying to do the following. val myObject = new MyObject(); val myObjectBroadcasted = sc.broadcast(myObject); val rdd1 = sc.textFile(/file1).map(e = { myObject.insert(e._1); (e._1,1) }); rdd.cache.count(); //to make sure it is transformed. val rdd2 = sc.textFile(/file2).map(e = { val lookedUp = myObject.lookup(e._1); (e._1, lookedUp) }); When I check the contents of myObject within the map of rdd1 everything seems ok. On the other hand when I check the contents of myObject within the map of rdd2 it seems to be empty. I am doing something wrong? Thanks a lot! -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
[SparkSQL] Number of map tasks in SparkSQL
Shark used to have shark.map.tasks variable. Is there an equivalent for Spark SQL? We are trying a scenario with heavily partitioned Hive tables. We end up with a UnionRDD with a lot of partitions underneath and hence too many tasks: https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala#L202 is there a good way to tell SQL to coalesce these? thanks for any pointers
RE: Brodcast Variable updated from one transformation and used from another
You're not using the broadcasted variable within your map operations. You're attempting to modify myObjrct directly which won't work because you are modifying the serialized copy on the executor. You want to do myObjectBroadcasted.value.insert and myObjectBroadcasted.value.lookup. Sent with Good (www.good.com) -Original Message- From: Yiannis Gkoufas [johngou...@gmail.commailto:johngou...@gmail.com] Sent: Tuesday, February 24, 2015 12:12 PM Eastern Standard Time To: user@spark.apache.org Subject: Brodcast Variable updated from one transformation and used from another Hi all, I am trying to do the following. val myObject = new MyObject(); val myObjectBroadcasted = sc.broadcast(myObject); val rdd1 = sc.textFile(/file1).map(e = { myObject.insert(e._1); (e._1,1) }); rdd.cache.count(); //to make sure it is transformed. val rdd2 = sc.textFile(/file2).map(e = { val lookedUp = myObject.lookup(e._1); (e._1, lookedUp) }); When I check the contents of myObject within the map of rdd1 everything seems ok. On the other hand when I check the contents of myObject within the map of rdd2 it seems to be empty. I am doing something wrong? Thanks a lot! The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Brodcast Variable updated from one transformation and used from another
Hi all, I am trying to do the following. val myObject = new MyObject(); val myObjectBroadcasted = sc.broadcast(myObject); val rdd1 = sc.textFile(/file1).map(e = { myObject.insert(e._1); (e._1,1) }); rdd.cache.count(); //to make sure it is transformed. val rdd2 = sc.textFile(/file2).map(e = { val lookedUp = myObject.lookup(e._1); (e._1, lookedUp) }); When I check the contents of myObject within the map of rdd1 everything seems ok. On the other hand when I check the contents of myObject within the map of rdd2 it seems to be empty. I am doing something wrong? Thanks a lot!
Re: Sharing Spark Drivers
I am aware of that, but two things are working against me here with spark-kernel. Python is our language, and we are really looking for a supported way to approach this for the enterprise. I like the concept, it just doesn't work for us given our constraints. This does raise an interesting point though, if side projects are spinning up to support this, why not make this a feature of the main project or is it just that esoteric that it's not important for the main project to be looking into it? On Tue, Feb 24, 2015 at 9:25 AM, Chip Senkbeil chip.senkb...@gmail.com wrote: Hi John, This would be a potential application for the Spark Kernel project (https://github.com/ibm-et/spark-kernel). The Spark Kernel serves as your driver application, allowing you to feed it snippets of code (or load up entire jars via magics) in Scala to execute against a Spark cluster. Although not technically supported, you can connect multiple applications to the same Spark Kernel instance to use the same resources (both on the cluster and on the driver). If you're curious, you can find a getting started section here: https://github.com/ibm-et/spark-kernel/wiki/Getting-Started-with-the-Spark-Kernel Signed, Chip Senkbeil On Tue Feb 24 2015 at 8:04:08 AM John Omernik j...@omernik.com wrote: I have been posting on the Mesos list, as I am looking to see if it it's possible or not to share spark drivers. Obviously, in stand alone cluster mode, the Master handles requests, and you can instantiate a new sparkcontext to a currently running master. However in Mesos (and perhaps Yarn) I don't see how this is possible. I guess I am curious on why? It could make quite a bit of sense to have one driver act as a master, running as a certain user, (ideally running out in the Mesos cluster, which I believe Tim Chen is working on). That driver could belong to a user, and be used as a long term resource controlled instance that the user could use for adhoc queries. While running many little ones out on the cluster seems to be a waste of driver resources, as each driver would be using the same resources, and rarely would many be used at once (if they were for a users adhoc environment). Additionally, the advantages of the shared driver seem to play out for a user as they come back to the environment over and over again. Does this make sense? I really want to try to understand how looking at this way is wrong, either from a Spark paradigm perspective of a technological perspective. I will grant, that I am coming from a traditional background, so some of the older ideas for how to set things up may be creeping into my thinking, but if that's the case, I'd love to understand better. Thanks1 John - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running multiple threads with same Spark Context
It's hard to tell. I have not run this on EC2 but this worked for me: The only thing that I can think of is that the scheduling mode is set to - *Scheduling Mode:* FAIR val pool: ExecutorService = Executors.newFixedThreadPool(poolSize) while_loop to get curr_job pool.execute(new ReportJob(sqlContext, curr_job, i)) class ReportJob(sqlContext:org.apache.spark.sql.hive.HiveContext,query: String,id:Int) extends Runnable with Logging { def threadId = (Thread.currentThread.getName() + \t) def run() { logInfo(s* Running ${threadId} ${id}) val startTime = Platform.currentTime val hiveQuery=query val result_set = sqlContext.sql(hiveQuery) result_set.repartition(1) result_set.saveAsParquetFile(shdfs:///tmp/${id}) logInfo(s* DONE ${threadId} ${id} time: +(Platform.currentTime-startTime)) } } On Tue, Feb 24, 2015 at 4:04 AM, Harika matha.har...@gmail.com wrote: Hi all, I have been running a simple SQL program on Spark. To test the concurrency, I have created 10 threads inside the program, all threads using same SQLContext object. When I ran the program on my EC2 cluster using spark-submit, only 3 threads were running in parallel. I have repeated the test on different EC2 clusters (containing different number of cores) and found out that only 3 threads are running in parallel on every cluster. Why is this behaviour seen? What does this number 3 specify? Is there any configuration parameter that I have to set if I want to run more threads concurrently? Thanks Harika -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-multiple-threads-with-same-Spark-Context-tp21784.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
throughput in the web console?
Hi, I plan to run a parameter search varying the number of cores, epoch, and parallelism. The web console provides a way to archive the previous runs, though is there a way to view in the console the throughput? Rather than logging the throughput separately to the log files and correlating the logs files to the web console processing times? Thanks, Josh
Re: Running out of space (when there's no shortage)
Usually it happens in Linux when application deletes file w/o double checking that there are no open FDs (resource leak). In this case, Linux holds all space allocated and does not release it until application exits (crashes in your case). You check file system and everything is normal, you have enough space and you have no idea why does application report no space left on device. Just a guess. -Vladimir Rodionov On Tue, Feb 24, 2015 at 8:34 AM, Joe Wass jw...@crossref.org wrote: I'm running a cluster of 3 Amazon EC2 machines (small number because it's expensive when experiments keep crashing after a day!). Today's crash looks like this (stacktrace at end of message). org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 On my three nodes, I have plenty of space and inodes: A $ df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 97937 426351 19% / tmpfs1909200 1 19091991% /dev/shm /dev/xvdb2457600 54 24575461% /mnt /dev/xvdc2457600 24 24575761% /mnt2 /dev/xvds831869296 23844 8318454521% /vol0 A $ df -h FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 3.4G 4.5G 44% / tmpfs 7.3G 0 7.3G 0% /dev/shm /dev/xvdb 37G 1.2G 34G 4% /mnt /dev/xvdc 37G 177M 35G 1% /mnt2 /dev/xvds1000G 802G 199G 81% /vol0 B $ df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 97947 426341 19% / tmpfs1906639 1 19066381% /dev/shm /dev/xvdb2457600 54 24575461% /mnt /dev/xvdc2457600 24 24575761% /mnt2 /dev/xvds816200704 24223 8161764811% /vol0 B $ df -h FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 3.6G 4.3G 46% / tmpfs 7.3G 0 7.3G 0% /dev/shm /dev/xvdb 37G 1.2G 34G 4% /mnt /dev/xvdc 37G 177M 35G 1% /mnt2 /dev/xvds1000G 805G 195G 81% /vol0 C $df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 97938 426350 19% / tmpfs1906897 1 19068961% /dev/shm /dev/xvdb2457600 54 24575461% /mnt /dev/xvdc2457600 24 24575761% /mnt2 /dev/xvds755218352 24024 7551943281% /vol0 root@ip-10-204-136-223 ~]$ C $ df -h FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 3.4G 4.5G 44% / tmpfs 7.3G 0 7.3G 0% /dev/shm /dev/xvdb 37G 1.2G 34G 4% /mnt /dev/xvdc 37G 177M 35G 1% /mnt2 /dev/xvds1000G 820G 181G 82% /vol0 The devices may be ~80% full but that still leaves ~200G free on each. My spark-env.sh has export SPARK_LOCAL_DIRS=/vol0/spark I have manually verified that on each slave the only temporary files are stored on /vol0, all looking something like this /vol0/spark/spark-f05d407c/spark-fca3e573/spark-78c06215/spark-4f0c4236/20/rdd_8_884 So it looks like all the files are being stored on the large drives (incidentally they're AWS EBS volumes, but that's the only way to get enough storage). My process crashed before with a slightly different exception under the same circumstances: kryo.KryoException: java.io.IOException: No space left on device These both happen after several hours and several GB of temporary files. Why does Spark think it's run out of space? TIA Joe Stack trace 1: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at
Re: Running out of space (when there's no shortage)
Here is a tool which may give you some clue: http://file-leak-detector.kohsuke.org/ Cheers On Tue, Feb 24, 2015 at 11:04 AM, Vladimir Rodionov vrodio...@splicemachine.com wrote: Usually it happens in Linux when application deletes file w/o double checking that there are no open FDs (resource leak). In this case, Linux holds all space allocated and does not release it until application exits (crashes in your case). You check file system and everything is normal, you have enough space and you have no idea why does application report no space left on device. Just a guess. -Vladimir Rodionov On Tue, Feb 24, 2015 at 8:34 AM, Joe Wass jw...@crossref.org wrote: I'm running a cluster of 3 Amazon EC2 machines (small number because it's expensive when experiments keep crashing after a day!). Today's crash looks like this (stacktrace at end of message). org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 On my three nodes, I have plenty of space and inodes: A $ df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 97937 426351 19% / tmpfs1909200 1 19091991% /dev/shm /dev/xvdb2457600 54 24575461% /mnt /dev/xvdc2457600 24 24575761% /mnt2 /dev/xvds831869296 23844 8318454521% /vol0 A $ df -h FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 3.4G 4.5G 44% / tmpfs 7.3G 0 7.3G 0% /dev/shm /dev/xvdb 37G 1.2G 34G 4% /mnt /dev/xvdc 37G 177M 35G 1% /mnt2 /dev/xvds1000G 802G 199G 81% /vol0 B $ df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 97947 426341 19% / tmpfs1906639 1 19066381% /dev/shm /dev/xvdb2457600 54 24575461% /mnt /dev/xvdc2457600 24 24575761% /mnt2 /dev/xvds816200704 24223 8161764811% /vol0 B $ df -h FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 3.6G 4.3G 46% / tmpfs 7.3G 0 7.3G 0% /dev/shm /dev/xvdb 37G 1.2G 34G 4% /mnt /dev/xvdc 37G 177M 35G 1% /mnt2 /dev/xvds1000G 805G 195G 81% /vol0 C $df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 97938 426350 19% / tmpfs1906897 1 19068961% /dev/shm /dev/xvdb2457600 54 24575461% /mnt /dev/xvdc2457600 24 24575761% /mnt2 /dev/xvds755218352 24024 7551943281% /vol0 root@ip-10-204-136-223 ~]$ C $ df -h FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 3.4G 4.5G 44% / tmpfs 7.3G 0 7.3G 0% /dev/shm /dev/xvdb 37G 1.2G 34G 4% /mnt /dev/xvdc 37G 177M 35G 1% /mnt2 /dev/xvds1000G 820G 181G 82% /vol0 The devices may be ~80% full but that still leaves ~200G free on each. My spark-env.sh has export SPARK_LOCAL_DIRS=/vol0/spark I have manually verified that on each slave the only temporary files are stored on /vol0, all looking something like this /vol0/spark/spark-f05d407c/spark-fca3e573/spark-78c06215/spark-4f0c4236/20/rdd_8_884 So it looks like all the files are being stored on the large drives (incidentally they're AWS EBS volumes, but that's the only way to get enough storage). My process crashed before with a slightly different exception under the same circumstances: kryo.KryoException: java.io.IOException: No space left on device These both happen after several hours and several GB of temporary files. Why does Spark think it's run out of space? TIA Joe Stack trace 1: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176) at
Re: Running out of space (when there's no shortage)
Hi there, I assume you are using spark 1.2.1 right? I faced the exact same issue and switched to 1.1.1 with the same configuration and it was solved. On 24 Feb 2015 19:22, Ted Yu yuzhih...@gmail.com wrote: Here is a tool which may give you some clue: http://file-leak-detector.kohsuke.org/ Cheers On Tue, Feb 24, 2015 at 11:04 AM, Vladimir Rodionov vrodio...@splicemachine.com wrote: Usually it happens in Linux when application deletes file w/o double checking that there are no open FDs (resource leak). In this case, Linux holds all space allocated and does not release it until application exits (crashes in your case). You check file system and everything is normal, you have enough space and you have no idea why does application report no space left on device. Just a guess. -Vladimir Rodionov On Tue, Feb 24, 2015 at 8:34 AM, Joe Wass jw...@crossref.org wrote: I'm running a cluster of 3 Amazon EC2 machines (small number because it's expensive when experiments keep crashing after a day!). Today's crash looks like this (stacktrace at end of message). org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 On my three nodes, I have plenty of space and inodes: A $ df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 97937 426351 19% / tmpfs1909200 1 19091991% /dev/shm /dev/xvdb2457600 54 24575461% /mnt /dev/xvdc2457600 24 24575761% /mnt2 /dev/xvds831869296 23844 8318454521% /vol0 A $ df -h FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 3.4G 4.5G 44% / tmpfs 7.3G 0 7.3G 0% /dev/shm /dev/xvdb 37G 1.2G 34G 4% /mnt /dev/xvdc 37G 177M 35G 1% /mnt2 /dev/xvds1000G 802G 199G 81% /vol0 B $ df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 97947 426341 19% / tmpfs1906639 1 19066381% /dev/shm /dev/xvdb2457600 54 24575461% /mnt /dev/xvdc2457600 24 24575761% /mnt2 /dev/xvds816200704 24223 8161764811% /vol0 B $ df -h FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 3.6G 4.3G 46% / tmpfs 7.3G 0 7.3G 0% /dev/shm /dev/xvdb 37G 1.2G 34G 4% /mnt /dev/xvdc 37G 177M 35G 1% /mnt2 /dev/xvds1000G 805G 195G 81% /vol0 C $df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 97938 426350 19% / tmpfs1906897 1 19068961% /dev/shm /dev/xvdb2457600 54 24575461% /mnt /dev/xvdc2457600 24 24575761% /mnt2 /dev/xvds755218352 24024 7551943281% /vol0 root@ip-10-204-136-223 ~]$ C $ df -h FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 3.4G 4.5G 44% / tmpfs 7.3G 0 7.3G 0% /dev/shm /dev/xvdb 37G 1.2G 34G 4% /mnt /dev/xvdc 37G 177M 35G 1% /mnt2 /dev/xvds1000G 820G 181G 82% /vol0 The devices may be ~80% full but that still leaves ~200G free on each. My spark-env.sh has export SPARK_LOCAL_DIRS=/vol0/spark I have manually verified that on each slave the only temporary files are stored on /vol0, all looking something like this /vol0/spark/spark-f05d407c/spark-fca3e573/spark-78c06215/spark-4f0c4236/20/rdd_8_884 So it looks like all the files are being stored on the large drives (incidentally they're AWS EBS volumes, but that's the only way to get enough storage). My process crashed before with a slightly different exception under the same circumstances: kryo.KryoException: java.io.IOException: No space left on device These both happen after several hours and several GB of temporary files. Why does Spark think it's run out of space? TIA Joe Stack trace 1: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at
Re: Not able to update collections
They aren't the same 'lst'. One is on your driver. It gets copied to executors when the tasks are executed. Those copies are updated. But the updates will never reflect in the local copy back in the driver. You may just wish to make an RDD of the results of func() and collect() them back to the driver. On Tue, Feb 24, 2015 at 7:20 PM, kvvt kvi...@vt.edu wrote: I am working on the below piece of code. var lst = scala.collection.mutable.MutableList[VertexId]() graph.edges.groupBy[VertexId](f).foreach { edgesBySrc = { lst ++= func(edgesBySrc) } } println(lst.length) Here, the final println() always says that the length of the list is 0. The list is non-empty (correctly prints the length of the returned list inside func()). I am not sure if I am doing the append correctly. Can someone point out what I am doing wrong? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Not-able-to-update-collections-tp21790.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [ML][SQL] Select UserDefinedType attribute in a DataFrame
If you make `Image` a case class, then select(image.data) should work. On Tue, Feb 24, 2015 at 3:06 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I have a DataFrame that contains a user defined type. The type is an image with the following attribute class Image(w: Int, h: Int, data: Vector) In my DataFrame, images are stored in column named image that corresponds to the following case class case class LabeledImage(label: Int, image: Image) How can I select image.data attribute of my image object and view it as a column of a DataFrame ? I'd like to do something like val featureDF = imagesDF.select(image.data).as(features) Cheers, Jao - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkStreaming failing with exception Could not compute split, block input
Hi Experts, My Spark Job is failing with below error. From the logs I can see that input-3-1424842351600 was added at 5:32:32 and was never purged out of memory. Also the available free memory for the executor is *2.1G*. Please help me figure out why executors cannot fetch this input. Txz for any help, Cheers. *Logs* 15/02/25 05:32:32 INFO storage.BlockManagerInfo: Added input-3-1424842351600 in memory on chsnmphbase31.usdc2.oraclecloud.com:50208 (size: 276.1 KB, free: 2.1 GB) . . 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added input-1-1424842362600 in memory on chsnmphbase30.usdc2.cloud.com:35919 (size: 232.3 KB, free: 2.1 GB) 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added input-4-1424842363000 in memory on chsnmphbase23.usdc2.cloud.com:37751 (size: 291.4 KB, free: 2.1 GB) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 32.1 in stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 37.1 in stage 451.0 (TID 22512, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 31.1 in stage 451.0 (TID 22513, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 34.1 in stage 451.0 (TID 22514, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 36.1 in stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 39.1 in stage 451.0 (TID 22516, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 30.1 in stage 451.0 (TID 22517, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 33.1 in stage 451.0 (TID 22518, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 35.1 in stage 451.0 (TID 22519, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 38.1 in stage 451.0 (TID 22520, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 32.1 in stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com): java.lang.Exception: Could not compute split, block input-3-1424842351600 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 36.1 in stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com): java.lang.Exception: Could not compute split, block input-3-1424842355600 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Re: Not able to update collections
Rdd.foreach runs in the executors. You should use `collect` to fetch data to the driver. E.g., myRdd.collect().foreach { node = { mp(node) = 1 } } Best Regards, Shixiong Zhu 2015-02-25 4:00 GMT+08:00 Vijayasarathy Kannan kvi...@vt.edu: Thanks, but it still doesn't seem to work. Below is my entire code. var mp = scala.collection.mutable.Map[VertexId, Int]() var myRdd = graph.edges.groupBy[VertexId](f).flatMap { edgesBySrc = func(edgesBySrc, a, b) } myRdd.foreach { node = { mp(node) = 1 } } Values in mp do not get updated for any element in myRdd. On Tue, Feb 24, 2015 at 2:39 PM, Sean Owen so...@cloudera.com wrote: Instead of ...foreach { edgesBySrc = { lst ++= func(edgesBySrc) } } try ...flatMap { edgesBySrc = func(edgesBySrc) } or even more succinctly ...flatMap(func) This returns an RDD that basically has the list you are trying to build, I believe. You can collect() to the driver but beware if it is a huge data set. If you really just mean to count the results, you can count() instead On Tue, Feb 24, 2015 at 7:35 PM, Vijayasarathy Kannan kvi...@vt.edu wrote: I am a beginner to Scala/Spark. Could you please elaborate on how to make RDD of results of func() and collect? On Tue, Feb 24, 2015 at 2:27 PM, Sean Owen so...@cloudera.com wrote: They aren't the same 'lst'. One is on your driver. It gets copied to executors when the tasks are executed. Those copies are updated. But the updates will never reflect in the local copy back in the driver. You may just wish to make an RDD of the results of func() and collect() them back to the driver. On Tue, Feb 24, 2015 at 7:20 PM, kvvt kvi...@vt.edu wrote: I am working on the below piece of code. var lst = scala.collection.mutable.MutableList[VertexId]() graph.edges.groupBy[VertexId](f).foreach { edgesBySrc = { lst ++= func(edgesBySrc) } } println(lst.length) Here, the final println() always says that the length of the list is 0. The list is non-empty (correctly prints the length of the returned list inside func()). I am not sure if I am doing the append correctly. Can someone point out what I am doing wrong? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Not-able-to-update-collections-tp21790.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to run hive queries inside spark
Hi Denny, yes the user has all the rights to HDFS. I am running all the spark operations with this user. and my hive-site.xml looks like this property namehive.metastore.warehouse.dir/name value/user/hive/warehouse/value descriptionlocation of default database for the warehouse/description /property Do I need to do anything explicitly other than placing hive-site.xml in the spark.conf directory ? Thanks !! On Wed, Feb 25, 2015 at 11:42 AM, Denny Lee denny.g@gmail.com wrote: The error message you have is: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:file:/user/hive/warehouse/src is not a directory or unable to create one) Could you verify that you (the user you are running under) has the rights to create the necessary folders within HDFS? On Tue, Feb 24, 2015 at 9:06 PM kundan kumar iitr.kun...@gmail.com wrote: Hi , I have placed my hive-site.xml inside spark/conf and i am trying to execute some hive queries given in the documentation. Can you please suggest what wrong am I doing here. scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@3340a4b8 scala hiveContext.hql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)) warning: there were 1 deprecation warning(s); re-run with -deprecation for details 15/02/25 10:30:59 INFO ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 15/02/25 10:30:59 INFO ParseDriver: Parse Completed 15/02/25 10:30:59 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/02/25 10:30:59 INFO ObjectStore: ObjectStore, initialize called 15/02/25 10:30:59 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored 15/02/25 10:30:59 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 15/02/25 10:30:59 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/02/25 10:30:59 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/02/25 10:31:08 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes=Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order 15/02/25 10:31:08 INFO MetaStoreDirectSql: MySQL check failed, assuming we are not on mysql: Lexical error at line 1, column 5. Encountered: @ (64), after : . 15/02/25 10:31:09 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/02/25 10:31:09 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/02/25 10:31:15 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/02/25 10:31:15 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/02/25 10:31:17 INFO ObjectStore: Initialized ObjectStore 15/02/25 10:31:17 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 0.13.1aa 15/02/25 10:31:18 INFO HiveMetaStore: Added admin role in metastore 15/02/25 10:31:18 INFO HiveMetaStore: Added public role in metastore 15/02/25 10:31:18 INFO HiveMetaStore: No user is added in admin role, since config is empty 15/02/25 10:31:18 INFO SessionState: No Tez session required at this point. hive.execution.engine=mr. 15/02/25 10:31:18 INFO PerfLogger: PERFLOG method=Driver.run from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:18 INFO PerfLogger: PERFLOG method=TimeToSubmit from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:18 INFO Driver: Concurrency mode is disabled, not creating a lock manager 15/02/25 10:31:18 INFO PerfLogger: PERFLOG method=compile from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:18 INFO PerfLogger: PERFLOG method=parse from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:18 INFO ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 15/02/25 10:31:18 INFO ParseDriver: Parse Completed 15/02/25 10:31:18 INFO PerfLogger: /PERFLOG method=parse start=1424840478985 end=1424840478986 duration=1 from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:18 INFO PerfLogger: PERFLOG method=semanticAnalyze from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:19 INFO SemanticAnalyzer: Starting Semantic Analysis 15/02/25 10:31:19 INFO SemanticAnalyzer: Creating table src position=27 15/02/25 10:31:19 INFO HiveMetaStore: 0: get_table : db=default tbl=src 15/02/25 10:31:19 INFO audit: ugi=spuser
Re: spark streaming: stderr does not roll
I'm also facing the same issue. I tried the configurations but it seems the executors spark's log4j.properties seems to override the passed values, so you have to change /etc/spark/conf/log4j.properties. Let me know if any of you have managed to get this fixes programatically. I am planning to use logrotate to rotate these logs. On Thu, Nov 13, 2014 at 1:45 AM, Nguyen, Duc duc.ngu...@pearson.com wrote: I've also tried setting the aforementioned properties using System.setProperty() as well as on the command line while submitting the job using --conf key=value. All to no success. When I go to the Spark UI and click on that particular streaming job and then the Environment tab, I can see the properties are correctly set. But regardless of what I've tried, the stderr log file on the worker nodes does not roll and continues to grow...leading to a crash of the cluster once it claims 100% of disk. Has anyone else encountered this? Anyone? On Fri, Nov 7, 2014 at 3:35 PM, Nguyen, Duc duc.ngu...@pearson.com wrote: We are running spark streaming jobs (version 1.1.0). After a sufficient amount of time, the stderr file grows until the disk is full at 100% and crashes the cluster. I've read this https://github.com/apache/spark/pull/895 and also read this http://spark.apache.org/docs/latest/configuration.html#spark-streaming So I've tried testing with this in an attempt to get the stderr log file to roll. sparkConf.set(spark.executor.logs.rolling.strategy, size) .set(spark.executor.logs.rolling.size.maxBytes, 1024) .set(spark.executor.logs.rolling.maxRetainedFiles, 3) Yet it does not roll and continues to grow. Am I missing something obvious? thanks, Duc -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Re: Spark excludes fastutil dependencies we need
bq. depend on missing fastutil classes like Long2LongOpenHashMap Looks like Long2LongOpenHashMap should be added to the shaded jar. Cheers On Tue, Feb 24, 2015 at 7:36 PM, Jim Kleckner j...@cloudphysics.com wrote: Spark includes the clearspring analytics package but intentionally excludes the dependencies of the fastutil package (see below). Spark includes parquet-column which includes fastutil and relocates it under parquet/ but creates a shaded jar file which is incomplete because it shades out some of the fastutil classes, notably Long2LongOpenHashMap, which is present in the fastutil jar file that parquet-column is referencing. We are using more of the clearspring classes (e.g. QDigest) and those do depend on missing fastutil classes like Long2LongOpenHashMap. Even though I add them to our assembly jar file, the class loader finds the spark assembly and we get runtime class loader errors when we try to use it. It is possible to put our jar file first, as described here: https://issues.apache.org/jira/browse/SPARK-939 http://spark.apache.org/docs/1.2.0/configuration.html#runtime-environment which I tried with args to spark-submit: --conf spark.driver.userClassPathFirst=true --conf spark.executor.userClassPathFirst=true but we still get the class not found error. We have tried copying the source code for clearspring into our package and renaming the package and that makes it appear to work... Is this risky? It certainly is ugly. Can anyone recommend a way to deal with this dependency **ll ? === The spark/pom.xml file contains the following lines: dependency groupIdcom.clearspring.analytics/groupId artifactIdstream/artifactId version2.7.0/version exclusions exclusion groupIdit.unimi.dsi/groupId artifactIdfastutil/artifactId /exclusion /exclusions /dependency === The parquet-column/pom.xml file contains: artifactIdmaven-shade-plugin/artifactId executions execution phasepackage/phase goals goalshade/goal /goals configuration minimizeJartrue/minimizeJar artifactSet includes includeit.unimi.dsi:fastutil/include /includes /artifactSet relocations relocation patternit.unimi.dsi/pattern shadedPatternparquet.it.unimi.dsi/shadedPattern /relocation /relocations /configuration /execution /executions -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-excludes-fastutil-dependencies-we-need-tp21794.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Help vote for Spark talks at the Hadoop Summit
Hi all, The Hadoop Summit uses community choice voting to decide which talks to feature. It would be great if the community could help vote for Spark talks so that Spark has a good showing at this event. You can make three votes on each track. Below I've listed 3 talks that are important to Spark's roadmap. Please give 3 votes to each of the following talks. Committer Track: Lessons from Running Ultra Large Scale Spark Workloads on Hadoop https://hadoopsummit.uservoice.com/forums/283260-committer-track/suggestions/7074016 Data Science track: DataFrames: large-scale data science on Hadoop data with Spark https://hadoopsummit.uservoice.com/forums/283261-data-science-and-hadoop/suggestions/7074147 Future of Hadoop track: Online Approximate OLAP in SparkSQL https://hadoopsummit.uservoice.com/forums/283266-the-future-of-apache-hadoop/suggestions/7074424 Thanks!
Re: Unable to run hive queries inside spark
The error message you have is: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:file:/user/hive/warehouse/src is not a directory or unable to create one) Could you verify that you (the user you are running under) has the rights to create the necessary folders within HDFS? On Tue, Feb 24, 2015 at 9:06 PM kundan kumar iitr.kun...@gmail.com wrote: Hi , I have placed my hive-site.xml inside spark/conf and i am trying to execute some hive queries given in the documentation. Can you please suggest what wrong am I doing here. scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@3340a4b8 scala hiveContext.hql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)) warning: there were 1 deprecation warning(s); re-run with -deprecation for details 15/02/25 10:30:59 INFO ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 15/02/25 10:30:59 INFO ParseDriver: Parse Completed 15/02/25 10:30:59 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/02/25 10:30:59 INFO ObjectStore: ObjectStore, initialize called 15/02/25 10:30:59 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored 15/02/25 10:30:59 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 15/02/25 10:30:59 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/02/25 10:30:59 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/02/25 10:31:08 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes=Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order 15/02/25 10:31:08 INFO MetaStoreDirectSql: MySQL check failed, assuming we are not on mysql: Lexical error at line 1, column 5. Encountered: @ (64), after : . 15/02/25 10:31:09 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/02/25 10:31:09 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/02/25 10:31:15 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/02/25 10:31:15 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/02/25 10:31:17 INFO ObjectStore: Initialized ObjectStore 15/02/25 10:31:17 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 0.13.1aa 15/02/25 10:31:18 INFO HiveMetaStore: Added admin role in metastore 15/02/25 10:31:18 INFO HiveMetaStore: Added public role in metastore 15/02/25 10:31:18 INFO HiveMetaStore: No user is added in admin role, since config is empty 15/02/25 10:31:18 INFO SessionState: No Tez session required at this point. hive.execution.engine=mr. 15/02/25 10:31:18 INFO PerfLogger: PERFLOG method=Driver.run from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:18 INFO PerfLogger: PERFLOG method=TimeToSubmit from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:18 INFO Driver: Concurrency mode is disabled, not creating a lock manager 15/02/25 10:31:18 INFO PerfLogger: PERFLOG method=compile from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:18 INFO PerfLogger: PERFLOG method=parse from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:18 INFO ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 15/02/25 10:31:18 INFO ParseDriver: Parse Completed 15/02/25 10:31:18 INFO PerfLogger: /PERFLOG method=parse start=1424840478985 end=1424840478986 duration=1 from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:18 INFO PerfLogger: PERFLOG method=semanticAnalyze from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:19 INFO SemanticAnalyzer: Starting Semantic Analysis 15/02/25 10:31:19 INFO SemanticAnalyzer: Creating table src position=27 15/02/25 10:31:19 INFO HiveMetaStore: 0: get_table : db=default tbl=src 15/02/25 10:31:19 INFO audit: ugi=spuser ip=unknown-ip-addr cmd=get_table : db=default tbl=src 15/02/25 10:31:19 INFO HiveMetaStore: 0: get_database: default 15/02/25 10:31:19 INFO audit: ugi=spuser ip=unknown-ip-addr cmd=get_database: default 15/02/25 10:31:19 INFO Driver: Semantic Analysis Completed 15/02/25 10:31:19 INFO PerfLogger: /PERFLOG method=semanticAnalyze start=1424840478986 end=1424840479063 duration=77 from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:19 INFO Driver: Returning Hive schema: Schema(fieldSchemas:null, properties:null)
used cores are less then total no. of core
Hi All, I am running a simple word count example of spark (standalone cluster) , In the UI it is showing For each worker no. of cores available are 32 ,but while running the jobs only 5 cores are being used, What should I do to increase no. of used core or it is selected based on jobs. Thanks Somnaht CAUTION - Disclaimer * This e-mail contains PRIVILEGED AND CONFIDENTIAL INFORMATION intended solely for the use of the addressee(s). If you are not the intended recipient, please notify the sender by e-mail and delete the original message. Further, you are not to copy, disclose, or distribute this e-mail or its contents to any other person and any such actions are unlawful. This e-mail may contain viruses. Infosys has taken every reasonable precaution to minimize this risk, but is not liable for any damage you may sustain as a result of any virus in this e-mail. You should carry out your own virus checks before opening the e-mail or attachment. Infosys reserves the right to monitor and review the content of all messages sent to or from this e-mail address. Messages sent to or from this e-mail address may be stored on the Infosys e-mail system. ***INFOSYS End of Disclaimer INFOSYS***
Re: Unable to run hive queries inside spark
That's all you should need to do. Saying this, I did run into an issue similar to this when I was switching Spark versions which were tied to different default Hive versions (eg Spark 1.3 by default works with Hive 0.13.1). I'm wondering if you may be hitting this issue due to that? On Tue, Feb 24, 2015 at 22:40 kundan kumar iitr.kun...@gmail.com wrote: Hi Denny, yes the user has all the rights to HDFS. I am running all the spark operations with this user. and my hive-site.xml looks like this property namehive.metastore.warehouse.dir/name value/user/hive/warehouse/value descriptionlocation of default database for the warehouse/description /property Do I need to do anything explicitly other than placing hive-site.xml in the spark.conf directory ? Thanks !! On Wed, Feb 25, 2015 at 11:42 AM, Denny Lee denny.g@gmail.com wrote: The error message you have is: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:file:/user/hive/warehouse/src is not a directory or unable to create one) Could you verify that you (the user you are running under) has the rights to create the necessary folders within HDFS? On Tue, Feb 24, 2015 at 9:06 PM kundan kumar iitr.kun...@gmail.com wrote: Hi , I have placed my hive-site.xml inside spark/conf and i am trying to execute some hive queries given in the documentation. Can you please suggest what wrong am I doing here. scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@3340a4b8 scala hiveContext.hql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)) warning: there were 1 deprecation warning(s); re-run with -deprecation for details 15/02/25 10:30:59 INFO ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 15/02/25 10:30:59 INFO ParseDriver: Parse Completed 15/02/25 10:30:59 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/02/25 10:30:59 INFO ObjectStore: ObjectStore, initialize called 15/02/25 10:30:59 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored 15/02/25 10:30:59 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 15/02/25 10:30:59 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/02/25 10:30:59 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/02/25 10:31:08 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes=Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order 15/02/25 10:31:08 INFO MetaStoreDirectSql: MySQL check failed, assuming we are not on mysql: Lexical error at line 1, column 5. Encountered: @ (64), after : . 15/02/25 10:31:09 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/02/25 10:31:09 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/02/25 10:31:15 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/02/25 10:31:15 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/02/25 10:31:17 INFO ObjectStore: Initialized ObjectStore 15/02/25 10:31:17 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 0.13.1aa 15/02/25 10:31:18 INFO HiveMetaStore: Added admin role in metastore 15/02/25 10:31:18 INFO HiveMetaStore: Added public role in metastore 15/02/25 10:31:18 INFO HiveMetaStore: No user is added in admin role, since config is empty 15/02/25 10:31:18 INFO SessionState: No Tez session required at this point. hive.execution.engine=mr. 15/02/25 10:31:18 INFO PerfLogger: PERFLOG method=Driver.run from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:18 INFO PerfLogger: PERFLOG method=TimeToSubmit from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:18 INFO Driver: Concurrency mode is disabled, not creating a lock manager 15/02/25 10:31:18 INFO PerfLogger: PERFLOG method=compile from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:18 INFO PerfLogger: PERFLOG method=parse from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:18 INFO ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 15/02/25 10:31:18 INFO ParseDriver: Parse Completed 15/02/25 10:31:18 INFO PerfLogger: /PERFLOG method=parse start=1424840478985 end=1424840478986 duration=1 from=org.apache.hadoop.hive.ql.Driver
Re: Executors dropping all memory stored RDDs?
I have a strong suspicion that it was caused by a disk full on the executor. I am not sure if the executor was supposed to recover that way from it. I cannot be sure about it, I should have had enough disk space, but I think I had some data skew which could have lead to some executor to run out of disk. So, in case someone else notices a behavior like this, make sure you check your cluster monitor (like ganglia). On Wed, Jan 28, 2015 at 5:40 PM, Thomas Gerber thomas.ger...@radius.com wrote: Hello, I am storing RDDs with the MEMORY_ONLY_SER Storage Level, during the run of a big job. At some point during the job, I went to the Executors page, and saw that 80% of my executors did not have stored RDDs anymore (executors.png). On the storage page, everything seems there (storage.png). But if I look at a given RDD (RDD_83.png), although it tells me on top that all 100 partitions are cached, when I look at the details, only 17 are actually stored (RDD_83_partitions), all on the 20% of executors that still had stored RDDs based on the Executors page. So I wonder: 1. Are those RDD still cached (in which case, we have a small reporting error), or not? 2. If not, what could cause an executor to drop its memory-stored RDD blocks? I guess a restart of an executor? When I compare an executor that seems to have dropped blocks vs one that has not: *** their *spark-hadoop-org.apache.spark.deploy.worker.Worker-1-ip-XX-XX-XX-XX.ec2.internal.out* content look the same *** they both have the same etime in ps (so, I guess no restart?) *** didn't see anything in the app log in the work folder (but it is large, so I might have missed it) Also, I must mention that the cluster was doing a lot of GCs, which might be a cause of the trouble. I would appreciate any pointer. Thomas
Re: used cores are less then total no. of core
Try adding --total-executor-cores 5 , where 5 is the number of cores. Thanks, Vishnu On Wed, Feb 25, 2015 at 11:52 AM, Somnath Pandeya somnath_pand...@infosys.com wrote: Hi All, I am running a simple word count example of spark (standalone cluster) , In the UI it is showing For each worker no. of cores available are 32 ,but while running the jobs only 5 cores are being used, What should I do to increase no. of used core or it is selected based on jobs. Thanks Somnaht CAUTION - Disclaimer * This e-mail contains PRIVILEGED AND CONFIDENTIAL INFORMATION intended solely for the use of the addressee(s). If you are not the intended recipient, please notify the sender by e-mail and delete the original message. Further, you are not to copy, disclose, or distribute this e-mail or its contents to any other person and any such actions are unlawful. This e-mail may contain viruses. Infosys has taken every reasonable precaution to minimize this risk, but is not liable for any damage you may sustain as a result of any virus in this e-mail. You should carry out your own virus checks before opening the e-mail or attachment. Infosys reserves the right to monitor and review the content of all messages sent to or from this e-mail address. Messages sent to or from this e-mail address may be stored on the Infosys e-mail system. ***INFOSYS End of Disclaimer INFOSYS***
Re: used cores are less then total no. of core
You can set the following in the conf while creating the SparkContext (if you are not using spark-submit) .set(spark.cores.max, 32) Thanks Best Regards On Wed, Feb 25, 2015 at 11:52 AM, Somnath Pandeya somnath_pand...@infosys.com wrote: Hi All, I am running a simple word count example of spark (standalone cluster) , In the UI it is showing For each worker no. of cores available are 32 ,but while running the jobs only 5 cores are being used, What should I do to increase no. of used core or it is selected based on jobs. Thanks Somnaht CAUTION - Disclaimer * This e-mail contains PRIVILEGED AND CONFIDENTIAL INFORMATION intended solely for the use of the addressee(s). If you are not the intended recipient, please notify the sender by e-mail and delete the original message. Further, you are not to copy, disclose, or distribute this e-mail or its contents to any other person and any such actions are unlawful. This e-mail may contain viruses. Infosys has taken every reasonable precaution to minimize this risk, but is not liable for any damage you may sustain as a result of any virus in this e-mail. You should carry out your own virus checks before opening the e-mail or attachment. Infosys reserves the right to monitor and review the content of all messages sent to or from this e-mail address. Messages sent to or from this e-mail address may be stored on the Infosys e-mail system. ***INFOSYS End of Disclaimer INFOSYS***
Re: Cannot access Spark web UI
My Hadoop version is Hadoop 2.5.0-cdh5.3.0 From the Driver logs [3] I can see that SparkUI started on a specified port, also my YARN app tracking URL[1] points to that port which is in turn getting redirected to the proxy URL[2] which gives me java.net.BindException: Cannot assign requested address. If there was a port conflict issue the sparkUI stark will have issues but that id not the case. [1] YARN: application_1424814313649_0006 spark-realtime-MessageStoreWriter SPARK ciuser root.ciuser RUNNING UNDEFINED 10% http://host21.cloud.com:44648 [2] ProxyURL: http://host28.cloud.com:8088/proxy/application_1424814313649_0006/ [3] LOGS: 15/02/25 04:25:02 INFO util.Utils: Successfully started service 'SparkUI' on port 44648. 15/02/25 04:25:02 INFO ui.SparkUI: Started SparkUI at http://host21.cloud.com:44648 15/02/25 04:25:02 INFO cluster.YarnClusterScheduler: Created YarnClusterScheduler 15/02/25 04:25:02 INFO netty.NettyBlockTransferService: Server created on 41518 On Wed, Feb 18, 2015 at 3:15 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: It seems like that its not able to get a port it needs are you sure that the required port is available. In what logs did you find this error? On Wed, Feb 18, 2015 at 2:21 PM, Akhil Das ak...@sigmoidanalytics.com wrote: The error says Cannot assign requested address. This means that you need to use the correct address for one of your network interfaces or 0.0.0.0 to accept connections from all interfaces. Can you paste your spark-env.sh file and /etc/hosts file. Thanks Best Regards On Wed, Feb 18, 2015 at 2:06 PM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Experts, I am running a spark-streaming app inside YARN. I have Spark History server running as well (Do we need it running to access UI?). The app is running fine as expected but the Spark's web UI is not accessible. When I try to access the ApplicationMaster of the Yarn application I get the below error. This looks very similar to https://issues.apache.org/jira/browse/SPARK-5837 but instead of java.net.ConnectException: Connection refused I am getting java.net.BindException: Cannot assign requested address as shown below. Please let me know if you have faced / fixed this issue, any help is greatly appreciated. *Exception* HTTP ERROR 500 Problem accessing /proxy/application_1424161379156_0001/. Reason: Cannot assign requested address Caused by: java.net.BindException: Cannot assign requested address at java.net.PlainSocketImpl.socketBind(Native Method) at java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:376) at java.net.Socket.bind(Socket.java:631) at java.net.Socket.init(Socket.java:423) at java.net.Socket.init(Socket.java:280) at org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:80) at org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:122) at org.apache.commons.httpclient.HttpConnection.open(HttpConnection.java:707) at org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:387) at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:346) at org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.proxyLink(WebAppProxyServlet.java:188) at org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.doGet(WebAppProxyServlet.java:345) at javax.servlet.http.HttpServlet.service(HttpServlet.java:707) at javax.servlet.http.HttpServlet.service(HttpServlet.java:820) at org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1221) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:66) at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:900) at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:834) at org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppFilter.doFilter(RMWebAppFilter.java:84) at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:795) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:118) at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:113) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter.doFilter(StaticUserWebFilter.java:109) at
Re: Running out of space (when there's no shortage)
No problem, Joe. There you go https://issues.apache.org/jira/browse/SPARK-5081 And also there is this one https://issues.apache.org/jira/browse/SPARK-5715 which is marked as resolved On 24 February 2015 at 21:51, Joe Wass jw...@crossref.org wrote: Thanks everyone. Yiannis, do you know if there's a bug report for this regression? For some other (possibly connected) reason I upgraded from 1.1.1 to 1.2.1, but I can't remember what the bug was. Joe On 24 February 2015 at 19:26, Yiannis Gkoufas johngou...@gmail.com wrote: Hi there, I assume you are using spark 1.2.1 right? I faced the exact same issue and switched to 1.1.1 with the same configuration and it was solved. On 24 Feb 2015 19:22, Ted Yu yuzhih...@gmail.com wrote: Here is a tool which may give you some clue: http://file-leak-detector.kohsuke.org/ Cheers On Tue, Feb 24, 2015 at 11:04 AM, Vladimir Rodionov vrodio...@splicemachine.com wrote: Usually it happens in Linux when application deletes file w/o double checking that there are no open FDs (resource leak). In this case, Linux holds all space allocated and does not release it until application exits (crashes in your case). You check file system and everything is normal, you have enough space and you have no idea why does application report no space left on device. Just a guess. -Vladimir Rodionov On Tue, Feb 24, 2015 at 8:34 AM, Joe Wass jw...@crossref.org wrote: I'm running a cluster of 3 Amazon EC2 machines (small number because it's expensive when experiments keep crashing after a day!). Today's crash looks like this (stacktrace at end of message). org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 On my three nodes, I have plenty of space and inodes: A $ df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 97937 426351 19% / tmpfs1909200 1 19091991% /dev/shm /dev/xvdb2457600 54 24575461% /mnt /dev/xvdc2457600 24 24575761% /mnt2 /dev/xvds831869296 23844 8318454521% /vol0 A $ df -h FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 3.4G 4.5G 44% / tmpfs 7.3G 0 7.3G 0% /dev/shm /dev/xvdb 37G 1.2G 34G 4% /mnt /dev/xvdc 37G 177M 35G 1% /mnt2 /dev/xvds1000G 802G 199G 81% /vol0 B $ df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 97947 426341 19% / tmpfs1906639 1 19066381% /dev/shm /dev/xvdb2457600 54 24575461% /mnt /dev/xvdc2457600 24 24575761% /mnt2 /dev/xvds816200704 24223 8161764811% /vol0 B $ df -h FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 3.6G 4.3G 46% / tmpfs 7.3G 0 7.3G 0% /dev/shm /dev/xvdb 37G 1.2G 34G 4% /mnt /dev/xvdc 37G 177M 35G 1% /mnt2 /dev/xvds1000G 805G 195G 81% /vol0 C $df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 97938 426350 19% / tmpfs1906897 1 19068961% /dev/shm /dev/xvdb2457600 54 24575461% /mnt /dev/xvdc2457600 24 24575761% /mnt2 /dev/xvds755218352 24024 7551943281% /vol0 root@ip-10-204-136-223 ~]$ C $ df -h FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 3.4G 4.5G 44% / tmpfs 7.3G 0 7.3G 0% /dev/shm /dev/xvdb 37G 1.2G 34G 4% /mnt /dev/xvdc 37G 177M 35G 1% /mnt2 /dev/xvds1000G 820G 181G 82% /vol0 The devices may be ~80% full but that still leaves ~200G free on each. My spark-env.sh has export SPARK_LOCAL_DIRS=/vol0/spark I have manually verified that on each slave the only temporary files are stored on /vol0, all looking something like this /vol0/spark/spark-f05d407c/spark-fca3e573/spark-78c06215/spark-4f0c4236/20/rdd_8_884 So it looks like all the files are being stored on the large drives (incidentally they're AWS EBS volumes, but that's the only way to get enough storage). My process crashed before with a slightly different exception under the same circumstances: kryo.KryoException: java.io.IOException: No space left on device These both happen after several hours and several GB of temporary files. Why does Spark think it's run out of space? TIA Joe Stack trace 1: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384) at
Re: reduceByKey vs countByKey
Hi Sathish, The current implementation of countByKey uses reduceByKey: https://github.com/apache/spark/blob/v1.2.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L332 It seems that countByKey is mostly deprecated: https://issues.apache.org/jira/browse/SPARK-3994 -Jey On Tue, Feb 24, 2015 at 3:53 PM, Sathish Kumaran Vairavelu vsathishkuma...@gmail.com wrote: Hello, Quick question. I am trying to understand difference between reduceByKey vs countByKey? Which one gives better performance reduceByKey or countByKey? While we can perform same count operation using reduceByKey why we need countByKey/countByValue? Thanks Sathish - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.3 dataframe documentation
The official documentation will be posted when 1.3 is released (early March). Right now, you can build the docs yourself by running jekyll build in docs. Alternatively, just look at dataframe,py as Ted pointed out. On Tue, Feb 24, 2015 at 6:56 AM, Ted Yu yuzhih...@gmail.com wrote: Have you looked at python/pyspark/sql/dataframe.py ? Cheers On Tue, Feb 24, 2015 at 6:12 AM, poiuytrez guilla...@databerries.com wrote: Hello, I have built Spark 1.3. I can successfully use the dataframe api. However, I am not able to find its api documentation in Python. Do you know when the documentation will be available? Best Regards, poiuytrez -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-dataframe-documentation-tp21789.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to start spark-shell with YARN?
Hi Sean, I launched the spark-shell on the same machine as I started YARN service. I don't think port will be an issue. I am new to spark. I checked the HDFS web UI and the YARN web UI. But I don't know how to check the AM. Can you help? Thanks, David On Tue, Feb 24, 2015 at 8:37 PM Sean Owen so...@cloudera.com wrote: I don't think the build is at issue. The error suggests your App Master can't be contacted. Is there a network port issue? did the AM fail? On Tue, Feb 24, 2015 at 9:15 AM, Xi Shen davidshe...@gmail.com wrote: Hi Arush, I got the pre-build from https://spark.apache.org/downloads.html. When I start spark-shell, it prompts: Spark assembly has been built with Hive, including Datanucleus jars on classpath So we don't have pre-build with YARN support? If so, how the spark-submit work? I checked the YARN log, and job is really submitted and ran successfully. Thanks, David On Tue Feb 24 2015 at 6:35:38 PM Arush Kharbanda ar...@sigmoidanalytics.com wrote: Hi Are you sure that you built Spark for Yarn.If standalone works, not sure if its build for Yarn. Thanks Arush On Tue, Feb 24, 2015 at 12:06 PM, Xi Shen davidshe...@gmail.com wrote: Hi, I followed this guide, http://spark.apache.org/docs/1.2.1/running-on-yarn.html, and tried to start spark-shell with yarn-client ./bin/spark-shell --master yarn-client But I got WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkYarnAM@10.0.2.15:38171] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. In the spark-shell, and other exceptions in they yarn log. Please see http://stackoverflow.com/questions/28671171/spark-shell-cannot-connect-to-yarn for more detail. However, submitting to the this cluster works. Also, spark-shell as standalone works. My system: - ubuntu amd64 - spark 1.2.1 - yarn from hadoop 2.6 stable Thanks, [image: --] Xi Shen [image: http://]about.me/davidshen http://about.me/davidshen?promo=email_sig http://about.me/davidshen -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: [ML][SQL] Select UserDefinedType attribute in a DataFrame
Btw, the correct syntax for alias should be `df.select($image.data.as(features))`. On Tue, Feb 24, 2015 at 3:35 PM, Xiangrui Meng men...@gmail.com wrote: If you make `Image` a case class, then select(image.data) should work. On Tue, Feb 24, 2015 at 3:06 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I have a DataFrame that contains a user defined type. The type is an image with the following attribute class Image(w: Int, h: Int, data: Vector) In my DataFrame, images are stored in column named image that corresponds to the following case class case class LabeledImage(label: Int, image: Image) How can I select image.data attribute of my image object and view it as a column of a DataFrame ? I'd like to do something like val featureDF = imagesDF.select(image.data).as(features) Cheers, Jao - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: New guide on how to write a Spark job in Clojure
Thanks for sharing, Chris. On Tue, Feb 24, 2015 at 4:39 AM, Christian Betz christian.b...@performance-media.de wrote: Hi all, Maybe some of you are interested: I wrote a new guide on how to start using Spark from Clojure. The tutorial covers - setting up a project, - doing REPL- or Test Driven Development of Spark jobs - Running Spark jobs locally. Just read it on https://gorillalabs.github.io/sparkling/articles/tfidf_guide.html. Comments (and Pull requests) are very welcome. Sincerly Chris
Fair Scheduler Pools
Hi, I am trying to use the fair scheduler pools (http://spark.apache.org/docs/latest/job-scheduling.html#fair-scheduler-pools) to schedule two jobs at the same time. In my simple example, I have configured spark in local mode with 2 cores (local[2]). I have also configured two pools in fairscheduler.xml that each have minShares = 1. With this configuration, I would assume that each all jobs in each pool will get assigned to one core. However, after running some simple experiments, and looking at the spark UI, I doesn't seem like this is the case. Is my understanding incorrect? If not, am I configuring things wrong? I have copied my code and xml below. Thanks, Nick code: val conf = new SparkConf() .setMaster(local[2]) .setAppName(Test) .set(spark.scheduler.mode, FAIR) .set(spark.scheduler.allocation.file, /etc/tercel/fairscheduler.xml) val sc = new SparkContext(conf) val input = sc.parallelize(1 to 10) new Thread(new Runnable() { override def run(): Unit = { sc.setLocalProperty(spark.scheduler.pool, pool1) val output1 = input.map { x = Thread.sleep(1000); x } output1.count() } }).start() new Thread(new Runnable() { override def run(): Unit = { sc.setLocalProperty(spark.scheduler.pool, pool2) val output2 = input.map { x = Thread.sleep(1000); x } output2.count() } }).start() fairscheduler.xml: ?xml version=1.0? allocations pool name=pool1 schedulingModeFAIR/schedulingMode weight1/weight minShare1/minShare /pool pool name=pool2 schedulingModeFAIR/schedulingMode weight1/weight minShare1/minShare /pool /allocations -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Fair-Scheduler-Pools-tp21791.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Add PredictionIO to Powered by Spark
Added - thanks! I trimmed it down a bit to fit our normal description length. On Mon, Jan 5, 2015 at 8:24 AM, Thomas Stone tho...@prediction.io wrote: Please can we add PredictionIO to https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark PredictionIO http://prediction.io/ PredictionIO is an open source machine learning server for software developers to easily build and deploy predictive applications on production. PredictionIO currently offers two engine templates for Apache Spark MLlib for recommendation (MLlib ALS) and classification (MLlib Naive Bayes). With these templates, you can create a custom predictive engine for production deployment efficiently. A standard PredictionIO stack is built on top of solid open source technology, such as Scala, Apache Spark, HBase and Elasticsearch. We are already featured on https://databricks.com/certified-on-spark Kind regards and Happy New Year! Thomas -- This page tracks the users of Spark. To add yourself to the list, please email user@spark.apache.org with your organization name, URL, a list of which Spark components you are using, and a short description of your use case. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [ML][SQL] Select UserDefinedType attribute in a DataFrame
By doing so, I got the following error : Exception in thread main org.apache.spark.sql.AnalysisException: GetField is not valid on fields Seems that it doesn't like image.data expression. On Wed, Feb 25, 2015 at 12:37 AM, Xiangrui Meng men...@gmail.com wrote: Btw, the correct syntax for alias should be `df.select($image.data.as(features))`. On Tue, Feb 24, 2015 at 3:35 PM, Xiangrui Meng men...@gmail.com wrote: If you make `Image` a case class, then select(image.data) should work. On Tue, Feb 24, 2015 at 3:06 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I have a DataFrame that contains a user defined type. The type is an image with the following attribute class Image(w: Int, h: Int, data: Vector) In my DataFrame, images are stored in column named image that corresponds to the following case class case class LabeledImage(label: Int, image: Image) How can I select image.data attribute of my image object and view it as a column of a DataFrame ? I'd like to do something like val featureDF = imagesDF.select(image.data).as(features) Cheers, Jao
Re: Spark 1.3 dataframe documentation
Another way to see the Python docs: $ export PYTHONPATH=$SPARK_HOME/python $ pydoc pyspark.sql On Tue, Feb 24, 2015 at 2:01 PM, Reynold Xin r...@databricks.com wrote: The official documentation will be posted when 1.3 is released (early March). Right now, you can build the docs yourself by running jekyll build in docs. Alternatively, just look at dataframe,py as Ted pointed out. On Tue, Feb 24, 2015 at 6:56 AM, Ted Yu yuzhih...@gmail.com wrote: Have you looked at python/pyspark/sql/dataframe.py ? Cheers On Tue, Feb 24, 2015 at 6:12 AM, poiuytrez guilla...@databerries.com wrote: Hello, I have built Spark 1.3. I can successfully use the dataframe api. However, I am not able to find its api documentation in Python. Do you know when the documentation will be available? Best Regards, poiuytrez -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-dataframe-documentation-tp21789.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can you add Big Industries to the Powered by Spark page?
I've added it, thanks! On Fri, Feb 20, 2015 at 12:22 AM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, Could you please add Big Industries to the Powered by Spark page at https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark ? Company Name: Big Industries URL: http://http://www.bigindustries.be/ Spark Components: Spark Streaming Use Case: Big Content Platform Summary: The Big Content Platform is a business-to-business content asset management service providing a searchable, aggregated source of live news feeds, public domain media and archives of content. The platform is founded on Apache Hadoop, uses the HDFS filesystem, Apache Spark, Titan Distributed Graph Database, HBase, and Solr. Additionally, the platform leverages public datasets like Freebase, DBpedia, Wiktionary, and Geonames to support semantic text enrichment. Kind regards, Emre Sevinç http://www.bigindustries.be/ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can you add Big Industries to the Powered by Spark page?
Hello, Thanks for adding, but URL seems to have a typo: when I click it tries to open http//www.bigindustries.be/ But it should be: http://www.bigindustries.be/ Kind regards, Emre Sevinç http://http//www.bigindustries.be/ On Feb 25, 2015 12:29 AM, Patrick Wendell pwend...@gmail.com wrote: I've added it, thanks! On Fri, Feb 20, 2015 at 12:22 AM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, Could you please add Big Industries to the Powered by Spark page at https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark ? Company Name: Big Industries URL: http://http://www.bigindustries.be/ Spark Components: Spark Streaming Use Case: Big Content Platform Summary: The Big Content Platform is a business-to-business content asset management service providing a searchable, aggregated source of live news feeds, public domain media and archives of content. The platform is founded on Apache Hadoop, uses the HDFS filesystem, Apache Spark, Titan Distributed Graph Database, HBase, and Solr. Additionally, the platform leverages public datasets like Freebase, DBpedia, Wiktionary, and Geonames to support semantic text enrichment. Kind regards, Emre Sevinç http://www.bigindustries.be/
reduceByKey vs countByKey
Hello, Quick question. I am trying to understand difference between reduceByKey vs countByKey? Which one gives better performance reduceByKey or countByKey? While we can perform same count operation using reduceByKey why we need countByKey/countByValue? Thanks Sathish
[ML][SQL] Select UserDefinedType attribute in a DataFrame
Hi all, I have a DataFrame that contains a user defined type. The type is an image with the following attribute *class Image(w: Int, h: Int, data: Vector)* In my DataFrame, images are stored in column named image that corresponds to the following case class *case class LabeledImage(label: Int, image: Image)* How can I select image.data attribute of my image object and view it as a column of a DataFrame ? I'd like to do something like *val featureDF = imagesDF.select(image.data).as(features)* Cheers, Jao
Re: How to start spark-shell with YARN?
It may have to do with the akka heartbeat interval per SPARK-3923 - https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-3923 ? On Tue, Feb 24, 2015 at 16:40 Xi Shen davidshe...@gmail.com wrote: Hi Sean, I launched the spark-shell on the same machine as I started YARN service. I don't think port will be an issue. I am new to spark. I checked the HDFS web UI and the YARN web UI. But I don't know how to check the AM. Can you help? Thanks, David On Tue, Feb 24, 2015 at 8:37 PM Sean Owen so...@cloudera.com wrote: I don't think the build is at issue. The error suggests your App Master can't be contacted. Is there a network port issue? did the AM fail? On Tue, Feb 24, 2015 at 9:15 AM, Xi Shen davidshe...@gmail.com wrote: Hi Arush, I got the pre-build from https://spark.apache.org/downloads.html. When I start spark-shell, it prompts: Spark assembly has been built with Hive, including Datanucleus jars on classpath So we don't have pre-build with YARN support? If so, how the spark-submit work? I checked the YARN log, and job is really submitted and ran successfully. Thanks, David On Tue Feb 24 2015 at 6:35:38 PM Arush Kharbanda ar...@sigmoidanalytics.com wrote: Hi Are you sure that you built Spark for Yarn.If standalone works, not sure if its build for Yarn. Thanks Arush On Tue, Feb 24, 2015 at 12:06 PM, Xi Shen davidshe...@gmail.com wrote: Hi, I followed this guide, http://spark.apache.org/docs/1.2.1/running-on-yarn.html, and tried to start spark-shell with yarn-client ./bin/spark-shell --master yarn-client But I got WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkYarnAM@10.0.2.15:38171] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. In the spark-shell, and other exceptions in they yarn log. Please see http://stackoverflow.com/questions/28671171/spark-shell-cannot-connect-to-yarn for more detail. However, submitting to the this cluster works. Also, spark-shell as standalone works. My system: - ubuntu amd64 - spark 1.2.1 - yarn from hadoop 2.6 stable Thanks, [image: --] Xi Shen [image: http://]about.me/davidshen http://about.me/davidshen?promo=email_sig http://about.me/davidshen -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Unable to run hive queries inside spark
Hi , I have placed my hive-site.xml inside spark/conf and i am trying to execute some hive queries given in the documentation. Can you please suggest what wrong am I doing here. scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@3340a4b8 scala hiveContext.hql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)) warning: there were 1 deprecation warning(s); re-run with -deprecation for details 15/02/25 10:30:59 INFO ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 15/02/25 10:30:59 INFO ParseDriver: Parse Completed 15/02/25 10:30:59 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/02/25 10:30:59 INFO ObjectStore: ObjectStore, initialize called 15/02/25 10:30:59 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored 15/02/25 10:30:59 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 15/02/25 10:30:59 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/02/25 10:30:59 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/02/25 10:31:08 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes=Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order 15/02/25 10:31:08 INFO MetaStoreDirectSql: MySQL check failed, assuming we are not on mysql: Lexical error at line 1, column 5. Encountered: @ (64), after : . 15/02/25 10:31:09 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/02/25 10:31:09 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/02/25 10:31:15 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/02/25 10:31:15 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/02/25 10:31:17 INFO ObjectStore: Initialized ObjectStore 15/02/25 10:31:17 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 0.13.1aa 15/02/25 10:31:18 INFO HiveMetaStore: Added admin role in metastore 15/02/25 10:31:18 INFO HiveMetaStore: Added public role in metastore 15/02/25 10:31:18 INFO HiveMetaStore: No user is added in admin role, since config is empty 15/02/25 10:31:18 INFO SessionState: No Tez session required at this point. hive.execution.engine=mr. 15/02/25 10:31:18 INFO PerfLogger: PERFLOG method=Driver.run from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:18 INFO PerfLogger: PERFLOG method=TimeToSubmit from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:18 INFO Driver: Concurrency mode is disabled, not creating a lock manager 15/02/25 10:31:18 INFO PerfLogger: PERFLOG method=compile from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:18 INFO PerfLogger: PERFLOG method=parse from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:18 INFO ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 15/02/25 10:31:18 INFO ParseDriver: Parse Completed 15/02/25 10:31:18 INFO PerfLogger: /PERFLOG method=parse start=1424840478985 end=1424840478986 duration=1 from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:18 INFO PerfLogger: PERFLOG method=semanticAnalyze from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:19 INFO SemanticAnalyzer: Starting Semantic Analysis 15/02/25 10:31:19 INFO SemanticAnalyzer: Creating table src position=27 15/02/25 10:31:19 INFO HiveMetaStore: 0: get_table : db=default tbl=src 15/02/25 10:31:19 INFO audit: ugi=spuser ip=unknown-ip-addr cmd=get_table : db=default tbl=src 15/02/25 10:31:19 INFO HiveMetaStore: 0: get_database: default 15/02/25 10:31:19 INFO audit: ugi=spuser ip=unknown-ip-addr cmd=get_database: default 15/02/25 10:31:19 INFO Driver: Semantic Analysis Completed 15/02/25 10:31:19 INFO PerfLogger: /PERFLOG method=semanticAnalyze start=1424840478986 end=1424840479063 duration=77 from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:19 INFO Driver: Returning Hive schema: Schema(fieldSchemas:null, properties:null) 15/02/25 10:31:19 INFO PerfLogger: /PERFLOG method=compile start=1424840478970 end=1424840479069 duration=99 from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:19 INFO PerfLogger: PERFLOG method=Driver.execute from=org.apache.hadoop.hive.ql.Driver 15/02/25 10:31:19 INFO Driver: Starting command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 15/02/25 10:31:19 INFO PerfLogger: /PERFLOG method=TimeToSubmit start=1424840478968 end=1424840479072 duration=104
Spark excludes fastutil dependencies we need
Spark includes the clearspring analytics package but intentionally excludes the dependencies of the fastutil package (see below). Spark includes parquet-column which includes fastutil and relocates it under parquet/ but creates a shaded jar file which is incomplete because it shades out some of the fastutil classes, notably Long2LongOpenHashMap, which is present in the fastutil jar file that parquet-column is referencing. We are using more of the clearspring classes (e.g. QDigest) and those do depend on missing fastutil classes like Long2LongOpenHashMap. Even though I add them to our assembly jar file, the class loader finds the spark assembly and we get runtime class loader errors when we try to use it. It is possible to put our jar file first, as described here: https://issues.apache.org/jira/browse/SPARK-939 http://spark.apache.org/docs/1.2.0/configuration.html#runtime-environment which I tried with args to spark-submit: --conf spark.driver.userClassPathFirst=true --conf spark.executor.userClassPathFirst=true but we still get the class not found error. We have tried copying the source code for clearspring into our package and renaming the package and that makes it appear to work... Is this risky? It certainly is ugly. Can anyone recommend a way to deal with this dependency **ll ? === The spark/pom.xml file contains the following lines: dependency groupIdcom.clearspring.analytics/groupId artifactIdstream/artifactId version2.7.0/version exclusions exclusion groupIdit.unimi.dsi/groupId artifactIdfastutil/artifactId /exclusion /exclusions /dependency === The parquet-column/pom.xml file contains: artifactIdmaven-shade-plugin/artifactId executions execution phasepackage/phase goals goalshade/goal /goals configuration minimizeJartrue/minimizeJar artifactSet includes includeit.unimi.dsi:fastutil/include /includes /artifactSet relocations relocation patternit.unimi.dsi/pattern shadedPatternparquet.it.unimi.dsi/shadedPattern /relocation /relocations /configuration /execution /executions -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-excludes-fastutil-dependencies-we-need-tp21794.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Task not serializable exception
Hi, I run into Task not Serializable excption with following code below. When I remove the threads and run, it works, but with threads I run into Task not serializable exception. object SparkKart extends Serializable{ def parseVector(line: String): Vector[Double] = { DenseVector(line.split(' ').map(_.toDouble)) } def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(SparkKart) val sc = new SparkContext(sparkConf) val lines = sc.textFile(args(0)) val data = lines.map(parseVector _) val kPoints = data.takeSample(withReplacement = false, 4, 42).toArray val thread1= new Thread (new Runnable { def run() { val dist1 = data.map(squaredDistance(_ , kPoints(0))) dist1.saveAsTextFile(hdfs:/kart3) } }) val thread2= new Thread (new Runnable { def run() { val dist1 =data.map(squaredDistance(_, kPoints(1))) dist1.saveAsTextFile(hdfs:/kart2) } }) val thread3= new Thread (new Runnable { def run() { val dist1 =data.map(squaredDistance(_, kPoints(2))) dist1.saveAsTextFile(hdfs:/kart1) } }) thread1.start thread2.start thread3.start } } Any help please? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Task-not-serializable-exception-tp21795.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Facing error while extending scala class with Product interface to overcome limit of 22 fields in spark-shell
Hi Akhil I guess it skipped my attention. I would definitely give it a try. While I would still like to know what is the issue with the way I have created schema? On Tue, Feb 24, 2015 at 4:35 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you happen to have a look at https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema Thanks Best Regards On Tue, Feb 24, 2015 at 3:39 PM, anu anamika.guo...@gmail.com wrote: My issue is posted here on stack-overflow. What am I doing wrong here? http://stackoverflow.com/questions/28689186/facing-error-while-extending-scala-class-with-product-interface-to-overcome-limi -- View this message in context: Facing error while extending scala class with Product interface to overcome limit of 22 fields in spark-shell http://apache-spark-user-list.1001560.n3.nabble.com/Facing-error-while-extending-scala-class-with-Product-interface-to-overcome-limit-of-22-fields-in-spl-tp21787.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Is Ubuntu server or desktop better for spark cluster
Check out the FAQ in the link by Deepak Vohra. The main differences are that the desktop installation includes common user's packages, as LibreOffice, while the server installation doesn't. But the server includes server user's packages, as apache2. Also, the Desktop has a GUI (a graphical interface to use with a mouse, etc). The server doesn't. So, the server has more free memory for your Spark (it's not used by the GUI) but if you are not quite proficient with a Linux shell it will be a lot harder to start and configure. Also, you can practically convert a desktop into a server or vice-versa (installing and uninstalling packages and changing configurations). If you don't feel completely confident with a pure linux shell, I would recommend you a Desktop version. Also, once you have all set up, you can disable the GUI to free some memory and work from a terminal: # Open a terminal Ctrl+Alt+F1 # Shutdown the GUI sudo stop lightdm (for reference: http://askubuntu.com/questions/148321/how-do-i-stop-gui) *Sebastián Ramírez* Diseñador de Algoritmos http://www.senseta.com Tel: (+571) 795 7950 ext: 1012 Cel: (+57) 300 370 77 10 Calle 73 No 7 - 06 Piso 4 Linkedin: co.linkedin.com/in/tiangolo/ Twitter: @tiangolo https://twitter.com/tiangolo Email: sebastian.rami...@senseta.com www.senseta.com On Sat, Feb 14, 2015 at 12:21 PM, Deepak Vohra dvohr...@yahoo.com.invalid wrote: For a beginner Ubuntu Desktop is recommended as it includes a GUI and is easier to install. Also refer ServerFaq - Community Help Wiki https://help.ubuntu.com/community/ServerFaq#What.27s_the_difference_between_desktop_and_server.3F ServerFaq - Community Help Wiki https://help.ubuntu.com/community/ServerFaq#What.27s_the_difference_between_desktop_and_server.3F Frequently Asked Questions about the Ubuntu Server Edition This Frequently Asked Questions document is intended to help system administrators and users of the Ubuntu Server edition. View on help.ubuntu.com https://help.ubuntu.com/community/ServerFaq#What.27s_the_difference_between_desktop_and_server.3F Preview by Yahoo -- *From:* Joanne Contact joannenetw...@gmail.com *To:* user@spark.apache.org user@spark.apache.org *Sent:* Saturday, February 14, 2015 7:05 AM *Subject:* Is Ubuntu server or desktop better for spark cluster Hi gurus, I am trying to install a real linux machine(not VM) where i will install spark also Hadoop. I plan on learning the clusters. I found Ubuntu has desktop and server versions. Do it matter? Thanks!! J - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- ** *This e-mail transmission, including any attachments, is intended only for the named recipient(s) and may contain information that is privileged, confidential and/or exempt from disclosure under applicable law. If you have received this transmission in error, or are not the named recipient(s), please notify Senseta immediately by return e-mail and permanently delete this transmission, including any attachments.*
EventLog / Timeline calculation - Optimization
Hello, For the past days I have been trying to process and analyse with Spark a Cassandra eventLog table similar to the one shown here. Basically what I want to calculate is the delta time epoch between each event type for all the device id's in the table. Currently its working as expected but I am wondering if there is a better or more optimal way of achieving this kind of calculation in Spark. Note that to simplify the example I have removed all the Cassandra stuff and just use a CSV file. *eventLog.txt:* dev_id,event_type,event_ts - 1,loging,2015-01-03 01:15:00 1,activated,2015-01-03 01:10:00 1,register,2015-01-03 01:00:00 2,get_data,2015-01-02 01:00:10 2,loging,2015-01-02 01:00:00 3,update_data,2015-01-01 01:15:00 3,get_data,2015-01-01 01:10:00 3,loging,2015-01-01 01:00:00 - *Spark Code:* - import java.sql.Timestamp def getDateDiff( d1:String, d2:String) : Long = { Timestamp.valueOf(d2).getTime() - Timestamp.valueOf(d1).getTime() } val rawEvents = sc.textFile(eventLog.txt).map(_.split(,)).map(e = (e(0).trim.toInt, e(1).trim, e(2).trim)) val indexed = rawEvents.zipWithIndex.map(_.swap) val shifted = indexed.map{case (k,v) = (k-1,v)} val joined = indexed.join(shifted) val cleaned = joined.filter(x = x._2._1._1 == x._2._2._1) // Filter out dev_id's that don't match val eventDuration = cleaned.map{case (i,(v1,v2)) = (v1._1, s${v1._2} - ${v2._2}, getDateDiff(v2._3, v1._3)) } eventDuration.collect.foreach(println) - *Output:* - (1,loging - activated,30) (3,get_data - loging,60) (1,activated - register,60) (2,get_data - loging,1) (3,update_data - get_data,30) This code was inspired by the following posts: http://stackoverflow.com/questions/26560292/apache-spark-distance-between-two-points-using-squareddistance http://apache-spark-user-list.1001560.n3.nabble.com/Cumulative-distance-calculation-on-moving-objects-RDD-td20729.html http://stackoverflow.com/questions/28236347/functional-approach-in-sequential-rdd-processing-apache-spark Best regards and thanks in advance for any suggestions, Sebastian -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/EventLog-Timeline-calculation-Optimization-tp21792.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org