breeze DGEMM slow in spark
Dear, all i'am testing double precision matrix multiplication in spark on ec2 m1.large machines. i use breeze linalg library, and internally it calls native library(openblas nehalem single threaded) m1.large: model name : Intel(R) Xeon(R) CPU E5-2650 0 @ 2.00GHz cpu MHz : 1795.672 model name : Intel(R) Xeon(R) CPU E5-2650 0 @ 2.00GHz cpu MHz : 1795.672 os: Linux ip-172-31-24-33 3.4.37-40.44.amzn1.x86_64 #1 SMP Thu Mar 21 01:17:08 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux here's my test code: def main(args: Array[String]) { val n = args(0).toInt val loop = args(1).toInt val ranGen = new Random var arr = ofDim[Double](loop,n*n) for(i - 0 until loop) for(j - 0 until n*n) { arr(i)(j) = ranGen.nextDouble() } var time0 = System.currentTimeMillis() println(init time = +time0) var c = new DenseMatrix[Double](n,n) var time1 = System.currentTimeMillis() println(start time = +time1) for(i - 0 until loop) { var a = new DenseMatrix[Double](n,n,arr(i)) var b = new DenseMatrix[Double](n,n,arr(i)) c :+= (a * b) } var time2 = System.currentTimeMillis() println(stop time = +time2) println(init time = +(time1-time0)) println(used time = +(time2-time1)) } two n=3584 matrix mult uses about 14s using the above test code. but when i put matrix mult part in spark mapPartitions function: val b = a.mapPartitions{ itr = val arr = itr.toArray //timestamp here var a = new DenseMatrix[Double](n,n,arr) var b = new DenseMatrix[Double](n,n,arr) c = a*b //timestamp here c.toIterator } two n=3584 matrix mult uses about 50s! there's a shuffle operation before matrix mult in spark, during shuffle phase the aggregated data are put in memory on the reduce side, there is no spill to disk. so the above 2 cases are all in memory matrix mult, and they all have enough memory, GC time is really small so why case 2 is 3.5x slower than case 1? has any one met this before, and what's your performance of DGEMM in spark? thanks for advices -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/breeze-DGEMM-slow-in-spark-tp5950.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Apache Spark Throws java.lang.IllegalStateException: unread block data
What we are doing is: 1. Installing Spark 0.9.1 according to the documentation on the website, along with CDH4 (and another cluster with CDH5) distros of hadoop/hdfs. 2. Building a fat jar with a Spark app with sbt then trying to run it on the cluster I've also included code snippets, and sbt deps at the bottom. When I've Googled this, there seems to be two somewhat vague responses: a) Mismatching spark versions on nodes/user code b) Need to add more jars to the SparkConf Now I know that (b) is not the problem having successfully run the same code on other clusters while only including one jar (it's a fat jar). But I have no idea how to check for (a) - it appears Spark doesn't have any version checks or anything - it would be nice if it checked versions and threw a mismatching version exception: you have user code using version X and node Y has version Z. I would be very grateful for advice on this. I've submitted a bug report, because there has to be something wrong with the Spark documentation because I've seen two independent sysadms get the exact same problem with different versions of CDH on different clusters. https://issues.apache.org/jira/browse/SPARK-1867 The exception: Exception in thread main org.apache.spark.SparkException: Job aborted: Task 0.0:1 failed 32 times (most recent failure: Exception failure: java.lang.IllegalStateException: unread block data) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 14/05/16 18:05:31 INFO scheduler.TaskSetManager: Loss was due to java.lang.IllegalStateException: unread block data [duplicate 59] My code snippet: val conf = new SparkConf() .setMaster(clusterMaster) .setAppName(appName) .setSparkHome(sparkHome) .setJars(SparkContext.jarOfClass(this.getClass)) println(count = + new SparkContext(conf).textFile(someHdfsPath).count()) My SBT dependencies: // relevant org.apache.spark % spark-core_2.10 % 0.9.1, org.apache.hadoop % hadoop-client % 2.3.0-mr1-cdh5.0.0, // standard, probably unrelated com.github.seratch %% awscala % [0.2,), org.scalacheck %% scalacheck % 1.10.1 % test, org.specs2 %% specs2 % 1.14 % test, org.scala-lang % scala-reflect % 2.10.3, org.scalaz %% scalaz-core % 7.0.5, net.minidev % json-smart % 1.2 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Throws-java-lang-IllegalStateException-unread-block-data-tp5952.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Benchmarking Spark with YCSB
Thanks Jay. I honestly think I just had a senior moment or something. I was getting HiBench and YCSB confused. Has anyone attempted to port HiBench to using Spark? HiBench performs a lot of map/reduce and it would be a very interesting comparison for us. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Benchmarking-Spark-with-YCSB-tp5813p5953.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Using mongo with PySpark
Where's your driver code (the code interacting with the RDDs)? Are you getting serialization errors? 2014년 5월 17일 토요일, Samarth Mailinglistmailinglistsama...@gmail.com님이 작성한 메시지: Hi all, I am trying to store the results of a reduce into mongo. I want to share the variable collection in the mappers. Here's what I have so far (I'm using pymongo) db = MongoClient()['spark_test_db'] collec = db['programs'] db = MongoClient()['spark_test_db'] *collec = db['programs']* def mapper(val): asc = val.encode('ascii','ignore') json = convertToJSON(asc, indexMap) collec.insert(json) # *this is not working* def convertToJSON(string, indexMap): values = string.strip().split(,) json = {} for i in range(len(values)): json[indexMap[i]] = values[i] return json How do I do this?
Re: Worker re-spawn and dynamic node joining
Thanks for the info about adding/removing nodes dynamically. That's valuable. 2014년 5월 16일 금요일, Akhil Dasak...@sigmoidanalytics.com님이 작성한 메시지: Hi Han :) 1. Is there a way to automatically re-spawn spark workers? We've situations where executor OOM causes worker process to be DEAD and it does not came back automatically. = Yes. You can either add OOM killer exceptionhttp://backdrift.org/how-to-create-oom-killer-exceptions on all of your Spark processes. Or you can have a cronjob which will keep monitoring your worker processes and if they goes down the cronjob will bring it back. 2. How to dynamically add (or remove) some worker machines to (from) the cluster? We'd like to leverage the auto-scaling group in EC2 for example. = You can add/remove worker nodes on the fly by spawning a new machine and then adding that machine's ip address in the master node then rsyncing the spark directory with all worker machines including the one you added. Then simply you can use the *start-all.sh* script inside the master node to bring up the new worker in action. For removing a worker machine from master can be done in the same way, you have to remove the workers IP address from the masters *slaves *file and then you can restart your slaves and that will get your worker removed. FYI, we have a deployment tool (a web-based UI) that we use for internal purposes, it is build on top of the spark-ec2 script (with some changes) and it has a module for adding/removing worker nodes on the fly. It looks like the attached screenshot. If you want i can give you some access. Thanks Best Regards On Wed, May 14, 2014 at 9:52 PM, Han JU ju.han.fe...@gmail.comjavascript:_e(%7B%7D,'cvml','ju.han.fe...@gmail.com'); wrote: Hi all, Just 2 questions: 1. Is there a way to automatically re-spawn spark workers? We've situations where executor OOM causes worker process to be DEAD and it does not came back automatically. 2. How to dynamically add (or remove) some worker machines to (from) the cluster? We'd like to leverage the auto-scaling group in EC2 for example. We're using spark-standalone. Thanks a lot. -- *JU Han* Data Engineer @ Botify.com +33 061960
Re: Worker re-spawn and dynamic node joining
A better way would be use Mesos (and quite possibly Yarn in 1.0.0). That will allow you to add nodes on the fly leverage it for Spark. Frankly Standalone mode is not meant to handle those issues. That said we use our deployment tool as stopping the cluster for adding nodes is not really an issue at the moment. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Sat, May 17, 2014 at 9:05 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Thanks for the info about adding/removing nodes dynamically. That's valuable. 2014년 5월 16일 금요일, Akhil Dasak...@sigmoidanalytics.com님이 작성한 메시지: Hi Han :) 1. Is there a way to automatically re-spawn spark workers? We've situations where executor OOM causes worker process to be DEAD and it does not came back automatically. = Yes. You can either add OOM killer exceptionhttp://backdrift.org/how-to-create-oom-killer-exceptions on all of your Spark processes. Or you can have a cronjob which will keep monitoring your worker processes and if they goes down the cronjob will bring it back. 2. How to dynamically add (or remove) some worker machines to (from) the cluster? We'd like to leverage the auto-scaling group in EC2 for example. = You can add/remove worker nodes on the fly by spawning a new machine and then adding that machine's ip address in the master node then rsyncing the spark directory with all worker machines including the one you added. Then simply you can use the *start-all.sh* script inside the master node to bring up the new worker in action. For removing a worker machine from master can be done in the same way, you have to remove the workers IP address from the masters *slaves *file and then you can restart your slaves and that will get your worker removed. FYI, we have a deployment tool (a web-based UI) that we use for internal purposes, it is build on top of the spark-ec2 script (with some changes) and it has a module for adding/removing worker nodes on the fly. It looks like the attached screenshot. If you want i can give you some access. Thanks Best Regards On Wed, May 14, 2014 at 9:52 PM, Han JU ju.han.fe...@gmail.com wrote: Hi all, Just 2 questions: 1. Is there a way to automatically re-spawn spark workers? We've situations where executor OOM causes worker process to be DEAD and it does not came back automatically. 2. How to dynamically add (or remove) some worker machines to (from) the cluster? We'd like to leverage the auto-scaling group in EC2 for example. We're using spark-standalone. Thanks a lot. -- *JU Han* Data Engineer @ Botify.com +33 061960
Re: Using mongo with PySpark
You have to ideally pass the mongoclient object along with your data in the mapper(python should be try to serialize your mongoclient, but explicit is better) if client is serializable then all should end well.. if not then you are better off using map partition initilizing the driver in each iteration load data of each partition. Thr is a similar discussion in the list in the past. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Sat, May 17, 2014 at 8:58 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Where's your driver code (the code interacting with the RDDs)? Are you getting serialization errors? 2014년 5월 17일 토요일, Samarth Mailinglistmailinglistsama...@gmail.com님이 작성한 메시지: Hi all, I am trying to store the results of a reduce into mongo. I want to share the variable collection in the mappers. Here's what I have so far (I'm using pymongo) db = MongoClient()['spark_test_db'] collec = db['programs'] db = MongoClient()['spark_test_db'] *collec = db['programs']* def mapper(val): asc = val.encode('ascii','ignore') json = convertToJSON(asc, indexMap) collec.insert(json) # *this is not working* def convertToJSON(string, indexMap): values = string.strip().split(,) json = {} for i in range(len(values)): json[indexMap[i]] = values[i] return json How do I do this?
Re: Historical Data as Stream
The real question is why are looking to consume file as a Stream 1. Too big to load as RDD 2. Operate in sequential manner. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Sat, May 17, 2014 at 5:12 AM, Soumya Simanta soumya.sima...@gmail.comwrote: File is just a steam with a fixed length. Usually streams don't end but in this case it would. On the other hand if you real your file as a steam may not be able to use the entire data in the file for your analysis. Spark (give enough memory) can process large amounts of data quickly. On May 15, 2014, at 9:52 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi, I have data in a file. Can I read it as Stream in spark? I know it seems odd to read file as stream but it has practical applications in real life if I can read it as stream. It there any other tools which can give this file as stream to Spark or I have to make batches manually which is not what I want. Its a coloumn of a million values. Regards, Laeeq
Re: Historical Data as Stream
@Soumya Simanta Right now its just a prove of concept. Later I will have a real stream. Its EEG files of brain. Later it can be used for real time analysis of eeg streams. @Mayur The size is huge yes. SO its better to do in distributed manner and as I said above I want to read as stream because later i will have stream data. This is a prove a concept. Regards, Laeeq On Saturday, May 17, 2014 7:03 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: The real question is why are looking to consume file as a Stream 1. Too big to load as RDD 2. Operate in sequential manner. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi On Sat, May 17, 2014 at 5:12 AM, Soumya Simanta soumya.sima...@gmail.com wrote: File is just a steam with a fixed length. Usually streams don't end but in this case it would. On the other hand if you real your file as a steam may not be able to use the entire data in the file for your analysis. Spark (give enough memory) can process large amounts of data quickly. On May 15, 2014, at 9:52 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi, I have data in a file. Can I read it as Stream in spark? I know it seems odd to read file as stream but it has practical applications in real life if I can read it as stream. It there any other tools which can give this file as stream to Spark or I have to make batches manually which is not what I want. Its a coloumn of a million values. Regards, Laeeq
Configuring Spark for reduceByKey on on massive data sets
I have had a lot of success with Spark on large datasets, both in terms of performance and flexibility. However I hit a wall with reduceByKey when the RDD contains billions of items. I am reducing with simple functions like addition for building histograms, so the reduction process should be constant memory. I am using 10s of AWS-EC2 macines with 60G memory and 30 processors. After a while the whole process just hangs. I have not been able to isolate the root problem from the logs, but I suspect that the problem is in the shuffling. Simple mapping and filtering transfomations work fine, and the reductions work fine if I reduce the data down to 10^8 items makes the reduceByKey go through. What do I need to do to make reducByKey work for 10^9 items. thanks Daniel
Benchmarking Graphx
HI, I want to do some benchmarking tests (run-time and memory) for one of GraphX examples, lets say PageRank on my single processor PC to start with. a) Is there a way to get the total time taken for the execution from start to finish? b) log4j properties need to be modified to turn off logging, but its not clear how to. c) how can this be extended to a cluster? d) also how to quantify memory overhead if i added more functionality to the execution? e) any scripts? reports generated? i am new to GraphX and Clusters, any help would be appreciated? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Benchmarking-Graphx-tp5965.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Configuring Spark for reduceByKey on on massive data sets
Daniel, How many partitions do you have? Are they more or less uniformly distributed? We have similar data volume currently running well on Hadoop MapReduce with roughly 30 nodes. I was planning to test it with Spark. I'm very interested in your findings. - Madhu https://www.linkedin.com/in/msiddalingaiah -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-Spark-for-reduceByKey-on-on-massive-data-sets-tp5966p5967.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Historical Data as Stream
@Laeeq - please see this example. https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala#L47-L49 On Sat, May 17, 2014 at 2:06 PM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: @Soumya Simanta Right now its just a prove of concept. Later I will have a real stream. Its EEG files of brain. Later it can be used for real time analysis of eeg streams. @Mayur The size is huge yes. SO its better to do in distributed manner and as I said above I want to read as stream because later i will have stream data. This is a prove a concept. Regards, Laeeq On Saturday, May 17, 2014 7:03 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: The real question is why are looking to consume file as a Stream 1. Too big to load as RDD 2. Operate in sequential manner. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Sat, May 17, 2014 at 5:12 AM, Soumya Simanta soumya.sima...@gmail.comwrote: File is just a steam with a fixed length. Usually streams don't end but in this case it would. On the other hand if you real your file as a steam may not be able to use the entire data in the file for your analysis. Spark (give enough memory) can process large amounts of data quickly. On May 15, 2014, at 9:52 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi, I have data in a file. Can I read it as Stream in spark? I know it seems odd to read file as stream but it has practical applications in real life if I can read it as stream. It there any other tools which can give this file as stream to Spark or I have to make batches manually which is not what I want. Its a coloumn of a million values. Regards, Laeeq
Unsubscribe
Re: breeze DGEMM slow in spark
i think maybe it's related to m1.large, because i also tested on my laptop, the two case cost nearly the same amount of time. my laptop: model name : Intel(R) Core(TM) i5-3380M CPU @ 2.90GHz cpu MHz : 2893.549 os: Linux ubuntu 3.11.0-12-generic #19-Ubuntu SMP Wed Oct 9 16:20:46 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/breeze-DGEMM-slow-in-spark-tp5950p5971.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Text file and shuffle
Hi, I'm new to spark and I wanted to understand a few things conceptually so that I can optimize my spark job. I have a large text file (~14G, 200k lines). This file is available on each worker node of my spark cluster. The job I run calls sc.textFile(...).flatmap(...) . The function that I pass into flat map splits up each line from the file into a key and value. Now I have another text file which is smaller in size(~1.5G) but has a lot more lines because it has more than one value per key spread across multiple lines. . I call the same textFile and flatmap functions on they other file and then call groupByKey to have all values for a key available as a list. Having done this I then cogroup these 2 RDDs. I have the following questions 1. Is this sequence of steps the best way to achieve what I want, I.e a join across the 2 data sets? 2. I have a 8 node (25 Gb memory each) . The large file flatmap spawns about 400 odd tasks whereas the small file flatmap only spawns about 30 odd tasks. The large file's flatmap takes about 2-3 mins and during this time it seems to do about 3G of shuffle write. I want to understand if this shuffle write is something I can avoid. From what I have read, the shuffle write is a disk write. Is that correct? Also is the reason for the shuffle write the fact that the partitioner for flatmap ends up having to redistribute the data across the cluster? Please let me know if I haven't provided enough information. I'm new to spark so if you see anything fundamental that I don't understand please feel free to just point me to a link that provides some detailed information. Thanks, Puneet
Unsubscribe
Re: breeze DGEMM slow in spark
You need to include breeze-natives or netlib:all to load the native libraries. Check the log messages to ensure native libraries are used, especially on the worker nodes. The easiest way to use OpenBLAS is copying the shared library to /usr/lib/libblas.so.3 and /usr/lib/liblapack.so.3. -Xiangrui On Sat, May 17, 2014 at 8:02 PM, wxhsdp wxh...@gmail.com wrote: i think maybe it's related to m1.large, because i also tested on my laptop, the two case cost nearly the same amount of time. my laptop: model name : Intel(R) Core(TM) i5-3380M CPU @ 2.90GHz cpu MHz : 2893.549 os: Linux ubuntu 3.11.0-12-generic #19-Ubuntu SMP Wed Oct 9 16:20:46 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/breeze-DGEMM-slow-in-spark-tp5950p5971.html Sent from the Apache Spark User List mailing list archive at Nabble.com.