Re: sparse x sparse matrix multiplication
I think Xiangrui's ALS code implement certain aspect of it. You may want to check it out. Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center From: Xiangrui Meng men...@gmail.com To: Duy Huynh duy.huynh@gmail.com Cc: user u...@spark.incubator.apache.org Date: 11/05/2014 01:13 PM Subject:Re: sparse x sparse matrix multiplication You can use breeze for local sparse-sparse matrix multiplication and then define an RDD of sub-matrices RDD[(Int, Int, CSCMatrix[Double])] (blockRowId, blockColId, sub-matrix) and then use join and aggregateByKey to implement this feature, which is the same as in MapReduce. -Xiangrui - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: CUDA in spark, especially in MLlib?
Thank you Debasish. I am fine with either Scala or Java. I would like to get a quick evaluation on the performance gain, e.g., ALS on GPU. I would like to try whichever library does the business :) Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan From: Debasish Das debasish.da...@gmail.com To: Frank van Lankvelt f.vanlankv...@onehippo.com, Cc: Matei Zaharia matei.zaha...@gmail.com, user user@spark.apache.org, Antonio Jesus Navarro ajnava...@stratio.com, Chen He airb...@gmail.com, Wei Tan/Watson/IBM@IBMUS Date: 08/28/2014 12:20 PM Subject:Re: CUDA in spark, especially in MLlib? Breeze author David also has a github project on cuda binding in scalado you prefer using java or scala ? On Aug 27, 2014 2:05 PM, Frank van Lankvelt f.vanlankv...@onehippo.com wrote: you could try looking at ScalaCL[1], it's targeting OpenCL rather than CUDA, but that might be close enough? cheers, Frank 1. https://github.com/ochafik/ScalaCL On Wed, Aug 27, 2014 at 7:33 PM, Wei Tan w...@us.ibm.com wrote: Thank you all. Actually I was looking at JCUDA. Function wise this may be a perfect solution to offload computation to GPU. Will see how performance it will be, especially with the Java binding. Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan From:Chen He airb...@gmail.com To:Antonio Jesus Navarro ajnava...@stratio.com, Cc:Matei Zaharia matei.zaha...@gmail.com, user@spark.apache.org, Wei Tan/Watson/IBM@IBMUS Date:08/27/2014 11:03 AM Subject:Re: CUDA in spark, especially in MLlib? JCUDA can let you do that in Java http://www.jcuda.org On Wed, Aug 27, 2014 at 1:48 AM, Antonio Jesus Navarro ajnava...@stratio.com wrote: Maybe this would interest you: CPU and GPU-accelerated Machine Learning Library: https://github.com/BIDData/BIDMach 2014-08-27 4:08 GMT+02:00 Matei Zaharia matei.zaha...@gmail.com: You should try to find a Java-based library, then you can call it from Scala. Matei On August 26, 2014 at 6:58:11 PM, Wei Tan (w...@us.ibm.com) wrote: Hi I am trying to find a CUDA library in Scala, to see if some matrix manipulation in MLlib can be sped up. I googled a few but found no active projects on Scala+CUDA. Python is supported by CUDA though. Any suggestion on whether this idea makes any sense? Best regards, Wei -- Amsterdam - Oosteinde 11, 1017 WT Amsterdam Boston - 1 Broadway, Cambridge, MA 02142 US +1 877 414 4776 (toll free) Europe +31(0)20 522 4466 www.onehippo.com
Re: CUDA in spark, especially in MLlib?
Thank you all. Actually I was looking at JCUDA. Function wise this may be a perfect solution to offload computation to GPU. Will see how performance it will be, especially with the Java binding. Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan From: Chen He airb...@gmail.com To: Antonio Jesus Navarro ajnava...@stratio.com, Cc: Matei Zaharia matei.zaha...@gmail.com, user@spark.apache.org, Wei Tan/Watson/IBM@IBMUS Date: 08/27/2014 11:03 AM Subject:Re: CUDA in spark, especially in MLlib? JCUDA can let you do that in Java http://www.jcuda.org On Wed, Aug 27, 2014 at 1:48 AM, Antonio Jesus Navarro ajnava...@stratio.com wrote: Maybe this would interest you: CPU and GPU-accelerated Machine Learning Library: https://github.com/BIDData/BIDMach 2014-08-27 4:08 GMT+02:00 Matei Zaharia matei.zaha...@gmail.com: You should try to find a Java-based library, then you can call it from Scala. Matei On August 26, 2014 at 6:58:11 PM, Wei Tan (w...@us.ibm.com) wrote: Hi I am trying to find a CUDA library in Scala, to see if some matrix manipulation in MLlib can be sped up. I googled a few but found no active projects on Scala+CUDA. Python is supported by CUDA though. Any suggestion on whether this idea makes any sense? Best regards, Wei
CUDA in spark, especially in MLlib?
Hi I am trying to find a CUDA library in Scala, to see if some matrix manipulation in MLlib can be sped up. I googled a few but found no active projects on Scala+CUDA. Python is supported by CUDA though. Any suggestion on whether this idea makes any sense? Best regards, Wei
Re: MLLib: implementing ALS with distributed matrix
Hi Xiangrui, yes I was not clear in my previous posting. You did optimization in block-level (which is brilliant!) so that blocks are joined first to avoid redundant data transfer. I have two follow-up questions: when you do rdd_a.join(rdd_b), which site this join will be done? Say, if sizeOf(rdd_a)sizeOf(rdd_b) then Spark moves rdd_b to rdd_a (in a per block manner) and do the join? for matrix factorization, there exist some distributed SGD algorithms such as: http://people.cs.umass.edu/~boduo/publications/2013EDBT-sparkler.pdf . I plan to do some performance comparison recently. Any idea on which method is better? Thanks! Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan From: Xiangrui Meng men...@gmail.com To: Wei Tan/Watson/IBM@IBMUS, Cc: user@spark.apache.org user@spark.apache.org Date: 08/04/2014 12:51 AM Subject:Re: MLLib: implementing ALS with distributed matrix To be precise, the optimization is not `get all products that are related to this user` but `get all products that are related to users inside this block`. So a product factor won't be sent to the same block more than once. We considered using GraphX to implement ALS, which is much easier to understand. Right now, GraphX doesn't support the optimization we use in ALS. But we are definitely looking for simplification of MLlib's ALS code. If you have some good ideas, please create a JIRA and we can move our discussion there. Best, Xiangrui On Sun, Aug 3, 2014 at 8:39 PM, Wei Tan w...@us.ibm.com wrote: Hi, I wrote my centralized ALS implementation, and read the distributed implementation in MLlib. It uses InLink and OutLink to implement functions like get all products which are related to this user, and ultimately achieves model distribution. If we have a distributed matrix lib, the complex InLink and OutLink logic can be relatively easily achieved with matrix select-row or select-column operators. With this InLink and OutLink based implementation, the distributed code is quite different and more complex than the centralized one. I have a question, could we move this complexity (InLink and OutLink) to a lower distributed matrix manipulation layer, leaving the upper layer ALS algorithm similar to a centralized one? To be more specific, if we can make a DoubleMatrix a RDD, optimize the distributed manipulation of it, we can make ALS algorithm easier to implement. Does it make any sense? Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLLib: implementing ALS with distributed matrix
Hi Deb, thanks for sharing your result. Please find my comments inline in blue. Best regards, Wei From: Debasish Das debasish.da...@gmail.com To: Wei Tan/Watson/IBM@IBMUS, Cc: Xiangrui Meng men...@gmail.com, user@spark.apache.org user@spark.apache.org Date: 08/17/2014 08:15 PM Subject:Re: MLLib: implementing ALS with distributed matrix Hi Wei, Sparkler code was not available for benchmarking and so I picked up Jellyfish which uses SGD and if you look at the paper, the ideas are very similar to sparkler paper but Jellyfish is on shared memory and uses C code while sparkler was built on top of spark...Jellyfish used some interesting regularization like L1 ball projection and in-fact the paper produced the best rmse for netflix dataset using these ideas... Jellyfish was a good runtime baseline to compare against...Of course the issue with shared memory is scalability... I know the authors of Sparkler and I may be able to get the code. The issue is that it needs a change to Spark by adding the CM, which makes the approach less appealing. For performance numbers, it seems that their run-time on a 40-machine cluster is similar to what I got using MLlib ALS, in a single 32-core machine. But this comparison is very rough and not so scientific -- just for your information. For a very brief back-of-envelop comparison (and correct me if I am wrong), it seems that Sparkler does not need to shape the rating matrix R in two ways: R and R'. Also it involves fewer data movement in each epoch -- since each product block only needs to move once (as a whole) to one destination user block. In ALS each product block needs to be split and moved to many user blocks. So I wonder why it does not provide a better performance over ALS. Sparkler was written when the ALS version that we have right now in mllib did not exist. The blocking idea used in Sparkler is not very different than what we have in ALS right now. But Sparkler used a faster checkpointing method (they called it Carousal Map) which is equally applicable to mllib ALS. Right now we checkpoint to hdfs but an efficient hot distributed cache (like memcached, redis or cassandra) will help improve the runtime further but will bring additional dependencies...right now mllib assumes hdfs and tachyon as checkpoint solution... In my experiments I found distributed spark ALS on 16 cores (8 nodes, each running 2 cores) to be around 2x slower compared to Jellyfish SGD also running on 16 cores in shared memory. The biggest machine we had is 16 core. Note that we did factor 20M users and 3M product ranges with ~400M ratings. 2x was a good number for me. I also found out spark ALS was 6-7X faster than Oryx (scalable version of Mahout ALS). In terms of RMSE, SGD produced RMSE very similar to ALS in my experiments with Netflix dataset. On new datasets, I still run Jellyfish to make sure that if on some dataset, SGD produces better local minima than ALS, that's an motivation to implement DSGD/Sparkler...I have cleaned up some netflix specific optimizations from Jellyfish. I also implemented a non-negative variant (similar projected gradient algorithm as used in NNLS.scala) but due to SGD implementing a projected gradient and getting good convergence turned out to be difficult. If you want I can open source the code. I have not run JellyFish yet. However, their paper mentioned a 3 min run-time on the Netflix data set. I also ran the Netflix data in MLLib ALS (rank 30, no implicit preference). ALS does not seem improve after 5 iterations which takes 4-5 minutes. The machine has 32 cores but I allocate 16 to ALS. But the ALS machine is more powerful than JellyFish's paper -- it has 12 cores. So my result seems to be consistent with your 2x (JellyFish vs. MLlib ALS) number. W.r.t. RMSE, I can never get a number 0.85, regardless of whatever configuration I use (I tried different ranks, lambda, and iterations). However JellyFish is able to achieve 0.8 RMSE. I wonder what is the RMSE you obtained and what ALS configuration you used. In general, a shared memory implementation should be always faster than a map-reduce like implementation, in such a scale. Agree? Spark ALS (and Oryx/Mahout's taste) represented the problem as least squares. For explicit no broadcast is needed and for implicit a global gram matrix is computed with rank x rank broadcast...if ranks are low, it is not a big issue. The way to represent the problem as least square has other benefits as well since we can map least square to a quadratic program and use the following ideas: https://issues.apache.org/jira/browse/SPARK-2426 We are doing much thorough analysis within our Spark ALS variants with Jellyfish on shared memory...I will point to you when the results are available.. I read though this thread. You already did a lot of implementation on this. Thanks for your sharing and I will spend some time reading this. I feel
RE: executor-cores vs. num-executors
Thanks for sharing your experience. I got the same experience -- multiple moderate JVMs beat a single huge JVM. Besides the minor JVM starting overhead, is it always better to have multiple JVMs rather than a single one? Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan From: innowireless TaeYun Kim taeyun@innowireless.co.kr To: user@spark.apache.org, Date: 07/16/2014 05:04 AM Subject:RE: executor-cores vs. num-executors Thanks. Really, now I compare a stage data of the two jobs, ‘core7-exec3’ spends about 12.5 minutes more than ‘core2-exec12’ on GC. From: Nishkam Ravi [mailto:nr...@cloudera.com] Sent: Wednesday, July 16, 2014 5:28 PM To: user@spark.apache.org Subject: Re: executor-cores vs. num-executors I think two small JVMs would often beat a large one due to lower GC overhead.
parallel stages?
Hi, I wonder if I do wordcount on two different files, like this: val file1 = sc.textFile(/...) val file2 = sc.textFile(/...) val wc1= file.flatMap(..).reduceByKey(_ + _,1) val wc2= file.flatMap(...).reduceByKey(_ + _,1) wc1.saveAsTextFile(titles.out) wc2.saveAsTextFile(tables.out) Would the two reduceByKey stages run in parallel given sufficient capacity? Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan
Re: parallel stages?
Thanks Sean. In Oozie you can use fork-join, however using Oozie to drive Spark jobs, jobs will not be able to share RDD (Am I right? I think multiple jobs submitted by Oozie will have different context). Wonder if Spark wants to add more workflow feature in future. Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan From: Sean Owen so...@cloudera.com To: user@spark.apache.org, Date: 07/15/2014 04:37 PM Subject:Re: parallel stages? The last two lines are what trigger the operations, and they will each block until the result is computed and saved. So if you execute this code as-is, no. You could write a Scala program that invokes these two operations in parallel, like: Array((wc1,titles.out), (wc2,tables.out)).par.foreach { case (wc,path) = wc.saveAsTestFile(path) } It worked for me and think it's OK to do this if you know you want to. On Tue, Jul 15, 2014 at 8:38 PM, Wei Tan w...@us.ibm.com wrote: Hi, I wonder if I do wordcount on two different files, like this: val file1 = sc.textFile(/...) val file2 = sc.textFile(/...) val wc1= file.flatMap(..).reduceByKey(_ + _,1) val wc2= file.flatMap(...).reduceByKey(_ + _,1) wc1.saveAsTextFile(titles.out) wc2.saveAsTextFile(tables.out) Would the two reduceByKey stages run in parallel given sufficient capacity? Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan
Re: Recommended pipeline automation tool? Oozie?
Just curious: how about using scala to drive the workflow? I guess if you use other tools (oozie, etc) you lose the advantage of reading from RDD -- you have to read from HDFS. Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan From: k.tham kevins...@gmail.com To: u...@spark.incubator.apache.org, Date: 07/10/2014 01:20 PM Subject:Recommended pipeline automation tool? Oozie? I'm just wondering what's the general recommendation for data pipeline automation. Say, I want to run Spark Job A, then B, then invoke script C, then do D, and if D fails, do E, and if Job A fails, send email F, etc... It looks like Oozie might be the best choice. But I'd like some advice/suggestions. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Recommended-pipeline-automation-tool-Oozie-tp9319.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: rdd.cache() is not faster?
Hi Gaurav, thanks for your pointer. The observation in the link is (at least qualitatively) similar to mine. Now the question is, if I do have big data (40GB, cached size is 60GB) and even big memory (192 GB), I cannot benefit from RDD cache, and should persist on disk and leverage filesystem cache? I will try more workers so that each JVM has a smaller heap. Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan From: Gaurav Jain ja...@student.ethz.ch To: u...@spark.incubator.apache.org, Date: 06/18/2014 06:30 AM Subject:Re: rdd.cache() is not faster? You cannot assume that caching would always reduce the execution time, especially if the data-set is large. It appears that if too much memory is used for caching, then less memory is left for the actual computation itself. There has to be a balance between the two. Page 33 of this thesis from KTH talks about this: http://www.diva-portal.org/smash/get/diva2:605106/FULLTEXT01.pdf Best - Gaurav Jain Master's Student, D-INFK ETH Zurich -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/rdd-cache-is-not-faster-tp7804p7835.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
rdd.cache() is not faster?
Hi, I have a 40G file which is a concatenation of multiple documents, I want to extract two features (title and tables) from each doc, so the program is like this: - val file = sc.textFile(/path/to/40G/file) //file.cache() //to enable or disable cache val titles = file.map(line = (doc_key, getTitle()) // reduce 1; here I use text utility functions written in Java { }).reduceByKey(_ + _,1) val tables = file.flatMap(line = { for (table - all_tables) yield (doc_key, getTableTitle()) // reduce 2; here I use text utility functions written in Java }).reduceByKey(_ + _,1) titles.saveAsTextFile(titles.out) //save_1, will trigger reduce_1 tables.saveAsTextFile(tables.out) //save_2, will trigger reduce_2 - I expect that with file.cache(), (the later) reduce_2 should be faster since it will read from cached data. However, results repeatedly shows that, reduce_2 takes 3 min when with cache and 1.4 min without cache. Why reading from cache does not help in this case? Stage GUI shows that, with cache, reduce_2 always has a wave of outlier tasks, where the median latency is 2s but max is 1.7 min. Metric Min 25th percentile Median 75th percentile Max Result serialization time 0 ms 0 ms 0 ms 0 ms 1 ms Duration 0.6 s 2 s 2 s 2 s 1.7 min But these tasks are not with a long GC pause (26 ms as shown) 173 1210 SUCCESS PROCESS_LOCAL localhost 2014/06/17 17:49:43 1.7 min 26 ms 9.4 KB BTW: it is a single machine with 32 cores, 192 GB RAM, SSD, with these lines in spark-env.sh SPARK_WORKER_MEMORY=180g SPARK_MEM=180g SPARK_JAVA_OPTS=-XX:+UseG1GC -XX:MaxGCPauseMillis=500 -XX:MaxPermSize=256m Thanks, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan
Re: long GC pause during file.cache()
Thanks you all for advice including (1) using CMS GC (2) use multiple worker instance and (3) use Tachyon. I will try (1) and (2) first and report back what I found. I will also try JDK 7 with G1 GC. Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan From: Aaron Davidson ilike...@gmail.com To: user@spark.apache.org, Date: 06/15/2014 09:06 PM Subject:Re: long GC pause during file.cache() Note also that Java does not work well with very large JVMs due to this exact issue. There are two commonly used workarounds: 1) Spawn multiple (smaller) executors on the same machine. This can be done by creating multiple Workers (via SPARK_WORKER_INSTANCES in standalone mode[1]). 2) Use Tachyon for off-heap caching of RDDs, allowing Spark executors to be smaller and avoid GC pauses [1] See standalone documentation here: http://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts On Sun, Jun 15, 2014 at 3:50 PM, Nan Zhu zhunanmcg...@gmail.com wrote: Yes, I think in the spark-env.sh.template, it is listed in the comments (didn’t check….) Best, -- Nan Zhu On Sunday, June 15, 2014 at 5:21 PM, Surendranauth Hiraman wrote: Is SPARK_DAEMON_JAVA_OPTS valid in 1.0.0? On Sun, Jun 15, 2014 at 4:59 PM, Nan Zhu zhunanmcg...@gmail.com wrote: SPARK_JAVA_OPTS is deprecated in 1.0, though it works fine if you don’t mind the WARNING in the logs you can set spark.executor.extraJavaOpts in your SparkConf obj Best, -- Nan Zhu On Sunday, June 15, 2014 at 12:13 PM, Hao Wang wrote: Hi, Wei You may try to set JVM opts in spark-env.sh as follow to prevent or mitigate GC pause: export SPARK_JAVA_OPTS=-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC -Xmx2g -XX:MaxPermSize=256m There are more options you could add, please just Google :) Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Sun, Jun 15, 2014 at 10:24 AM, Wei Tan w...@us.ibm.com wrote: Hi, I have a single node (192G RAM) stand-alone spark, with memory configuration like this in spark-env.sh SPARK_WORKER_MEMORY=180g SPARK_MEM=180g In spark-shell I have a program like this: val file = sc.textFile(/localpath) //file size is 40G file.cache() val output = file.map(line = extract something from line) output.saveAsTextFile (...) When I run this program again and again, or keep trying file.unpersist() -- file.cache() -- output.saveAsTextFile(), the run time varies a lot, from 1 min to 3 min to 50+ min. Whenever the run-time is more than 1 min, from the stage monitoring GUI I observe big GC pause (some can be 10+ min). Of course when run-time is normal, say ~1 min, no significant GC is observed. The behavior seems somewhat random. Is there any JVM tuning I should do to prevent this long GC pause from happening? I used java-1.6.0-openjdk.x86_64, and my spark-shell process is something like this: root 10994 1.7 0.6 196378000 1361496 pts/51 Sl+ 22:06 0:12 /usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp ::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar -XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g org.apache.spark.deploy.SparkSubmit spark-shell --class org.apache.spark.repl.Main Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hira...@velos.io W: www.velos.io
Re: long GC pause during file.cache()
BTW: nowadays a single machine with huge RAM (200G to 1T) is really common. With virtualization you lose some performance. It would be ideal to see some best practice on how to use Spark in these state-of-art machines... Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan From: Wei Tan/Watson/IBM@IBMUS To: user@spark.apache.org, Date: 06/16/2014 10:47 AM Subject:Re: long GC pause during file.cache() Thanks you all for advice including (1) using CMS GC (2) use multiple worker instance and (3) use Tachyon. I will try (1) and (2) first and report back what I found. I will also try JDK 7 with G1 GC. Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan From:Aaron Davidson ilike...@gmail.com To:user@spark.apache.org, Date:06/15/2014 09:06 PM Subject:Re: long GC pause during file.cache() Note also that Java does not work well with very large JVMs due to this exact issue. There are two commonly used workarounds: 1) Spawn multiple (smaller) executors on the same machine. This can be done by creating multiple Workers (via SPARK_WORKER_INSTANCES in standalone mode[1]). 2) Use Tachyon for off-heap caching of RDDs, allowing Spark executors to be smaller and avoid GC pauses [1] See standalone documentation here: http://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts On Sun, Jun 15, 2014 at 3:50 PM, Nan Zhu zhunanmcg...@gmail.com wrote: Yes, I think in the spark-env.sh.template, it is listed in the comments (didn’t check….) Best, -- Nan Zhu On Sunday, June 15, 2014 at 5:21 PM, Surendranauth Hiraman wrote: Is SPARK_DAEMON_JAVA_OPTS valid in 1.0.0? On Sun, Jun 15, 2014 at 4:59 PM, Nan Zhu zhunanmcg...@gmail.com wrote: SPARK_JAVA_OPTS is deprecated in 1.0, though it works fine if you don’t mind the WARNING in the logs you can set spark.executor.extraJavaOpts in your SparkConf obj Best, -- Nan Zhu On Sunday, June 15, 2014 at 12:13 PM, Hao Wang wrote: Hi, Wei You may try to set JVM opts in spark-env.sh as follow to prevent or mitigate GC pause: export SPARK_JAVA_OPTS=-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC -Xmx2g -XX:MaxPermSize=256m There are more options you could add, please just Google :) Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Sun, Jun 15, 2014 at 10:24 AM, Wei Tan w...@us.ibm.com wrote: Hi, I have a single node (192G RAM) stand-alone spark, with memory configuration like this in spark-env.sh SPARK_WORKER_MEMORY=180g SPARK_MEM=180g In spark-shell I have a program like this: val file = sc.textFile(/localpath) //file size is 40G file.cache() val output = file.map(line = extract something from line) output.saveAsTextFile (...) When I run this program again and again, or keep trying file.unpersist() -- file.cache() -- output.saveAsTextFile(), the run time varies a lot, from 1 min to 3 min to 50+ min. Whenever the run-time is more than 1 min, from the stage monitoring GUI I observe big GC pause (some can be 10+ min). Of course when run-time is normal, say ~1 min, no significant GC is observed. The behavior seems somewhat random. Is there any JVM tuning I should do to prevent this long GC pause from happening? I used java-1.6.0-openjdk.x86_64, and my spark-shell process is something like this: root 10994 1.7 0.6 196378000 1361496 pts/51 Sl+ 22:06 0:12 /usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp ::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar -XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g org.apache.spark.deploy.SparkSubmit spark-shell --class org.apache.spark.repl.Main Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hira...@velos.io W: www.velos.io
long GC pause during file.cache()
Hi, I have a single node (192G RAM) stand-alone spark, with memory configuration like this in spark-env.sh SPARK_WORKER_MEMORY=180g SPARK_MEM=180g In spark-shell I have a program like this: val file = sc.textFile(/localpath) //file size is 40G file.cache() val output = file.map(line = extract something from line) output.saveAsTextFile (...) When I run this program again and again, or keep trying file.unpersist() -- file.cache() -- output.saveAsTextFile(), the run time varies a lot, from 1 min to 3 min to 50+ min. Whenever the run-time is more than 1 min, from the stage monitoring GUI I observe big GC pause (some can be 10+ min). Of course when run-time is normal, say ~1 min, no significant GC is observed. The behavior seems somewhat random. Is there any JVM tuning I should do to prevent this long GC pause from happening? I used java-1.6.0-openjdk.x86_64, and my spark-shell process is something like this: root 10994 1.7 0.6 196378000 1361496 pts/51 Sl+ 22:06 0:12 /usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp ::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar -XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g org.apache.spark.deploy.SparkSubmit spark-shell --class org.apache.spark.repl.Main Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan
Re: How to compile a Spark project in Scala IDE for Eclipse?
This will make the compilation pass but you may not be able to run it correctly. I used maven adding these two jars (I use Hadoop 1), maven added their dependent jars (a lot) for me. dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.0.0/version /dependency dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId version1.2.1/version /dependency Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan From: Krishna Sankar ksanka...@gmail.com To: user@spark.apache.org, Date: 06/08/2014 11:19 AM Subject:Re: How to compile a Spark project in Scala IDE for Eclipse? Project-Properties-Java Build Path-Add External Jars Add the /spark-1.0.0-bin-hadoop2/lib/spark-assembly-1.0.0-hadoop2.2.0.jar Cheers K/ On Sun, Jun 8, 2014 at 8:06 AM, Carter gyz...@hotmail.com wrote: Hi All, I just downloaded the Scala IDE for Eclipse. After I created a Spark project and clicked Run there was an error on this line of code import org.apache.spark.SparkContext: object apache is not a member of package org. I guess I need to import the Spark dependency into Scala IDE for Eclipse, can anyone tell me how to do it? Thanks a lot. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-compile-a-Spark-project-in-Scala-IDE-for-Eclipse-tp7197.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
best practice: write and debug Spark application in scala-ide and maven
Hi, I am trying to write and debug Spark applications in scala-ide and maven, and in my code I target at a Spark instance at spark://xxx object App { def main(args : Array[String]) { println( Hello World! ) val sparkConf = new SparkConf().setMaster(spark://xxx:7077).setAppName(WordCount) val spark = new SparkContext(sparkConf) val file = spark.textFile(hdfs://xxx:9000/wcinput/pg1184.txt) val counts = file.flatMap(line = line.split( )) .map(word = (word, 1)) .reduceByKey(_ + _) counts.saveAsTextFile(hdfs://flex05.watson.ibm.com:9000/wcoutput) } } I added spark-core and hadoop-client in maven dependency so the code compiles fine. When I click run in Eclipse I got this error: 14/06/06 20:52:18 WARN scheduler.TaskSetManager: Loss was due to java.lang.ClassNotFoundException java.lang.ClassNotFoundException: samples.App$$anonfun$2 I googled this error and it seems that I need to package my code into a jar file and push it to spark nodes. But since I am debugging the code, it would be handy if I can quickly see results without packaging and uploading jars. What is the best practice of writing a spark application (like wordcount) and debug quickly on a remote spark instance? Thanks! Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan
Re: reuse hadoop code in Spark
Thanks Matei. Using your pointers I can import data frrom HDFS, what I want to do now is something like this in Spark: --- import myown.mapper rdd.map (mapper.map) --- The reason why I want this: myown.mapper is a java class I already developed. I used to run it in Hadoop. It is fairly complex and relies on a lot of utility java classes I wrote. Can I reuse the map function in java and port it into Spark? Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan From: Matei Zaharia matei.zaha...@gmail.com To: user@spark.apache.org, Date: 06/04/2014 04:28 PM Subject:Re: reuse hadoop code in Spark Yes, you can write some glue in Spark to call these. Some functions to look at: - SparkContext.hadoopRDD lets you create an input RDD from an existing JobConf configured by Hadoop (including InputFormat, paths, etc) - RDD.mapPartitions lets you operate in all the values on one partition (block) at a time, similar to how Mappers in MapReduce work - PairRDDFunctions.reduceByKey and groupByKey can be used for aggregation. - RDD.pipe() can be used to call out to a script or binary, like Hadoop Streaming. A fair number of people have been running both Java and Hadoop Streaming apps like this. Matei On Jun 4, 2014, at 1:08 PM, Wei Tan w...@us.ibm.com wrote: Hello, I am trying to use spark in such a scenario: I have code written in Hadoop and now I try to migrate to Spark. The mappers and reducers are fairly complex. So I wonder if I can reuse the map() functions I already wrote in Hadoop (Java), and use Spark to chain them, mixing the Java map() functions with Spark operators? Another related question, can I use binary as operators, like Hadoop streaming? Thanks! Wei
reuse hadoop code in Spark
Hello, I am trying to use spark in such a scenario: I have code written in Hadoop and now I try to migrate to Spark. The mappers and reducers are fairly complex. So I wonder if I can reuse the map() functions I already wrote in Hadoop (Java), and use Spark to chain them, mixing the Java map() functions with Spark operators? Another related question, can I use binary as operators, like Hadoop streaming? Thanks! Wei