Re: sparse x sparse matrix multiplication

2014-11-05 Thread Wei Tan

I think Xiangrui's ALS code implement certain aspect of it. You may want to
check it out.
Best regards,
Wei

-
Wei Tan, PhD
Research Staff Member
IBM T. J. Watson Research Center




From:   Xiangrui Meng men...@gmail.com
To: Duy Huynh duy.huynh@gmail.com
Cc: user u...@spark.incubator.apache.org
Date:   11/05/2014 01:13 PM
Subject:Re: sparse x sparse matrix multiplication



You can use breeze for local sparse-sparse matrix multiplication and
then define an RDD of sub-matrices

RDD[(Int, Int, CSCMatrix[Double])] (blockRowId, blockColId, sub-matrix)

and then use join and aggregateByKey to implement this feature, which
is the same as in MapReduce.

-Xiangrui

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: CUDA in spark, especially in MLlib?

2014-08-28 Thread Wei Tan
Thank you Debasish.

I am fine with either Scala or Java. I would like to get a quick 
evaluation on the performance gain, e.g., ALS on GPU. I would like to try 
whichever library does the business :)

Best regards,
Wei

-
Wei Tan, PhD
Research Staff Member
IBM T. J. Watson Research Center
http://researcher.ibm.com/person/us-wtan



From:   Debasish Das debasish.da...@gmail.com
To: Frank van Lankvelt f.vanlankv...@onehippo.com, 
Cc: Matei Zaharia matei.zaha...@gmail.com, user 
user@spark.apache.org, Antonio Jesus Navarro ajnava...@stratio.com, 
Chen He airb...@gmail.com, Wei Tan/Watson/IBM@IBMUS
Date:   08/28/2014 12:20 PM
Subject:Re: CUDA in spark, especially in MLlib?



Breeze author David also has a github project on cuda binding in 
scalado you prefer using java or scala ?
On Aug 27, 2014 2:05 PM, Frank van Lankvelt f.vanlankv...@onehippo.com 
wrote:
you could try looking at ScalaCL[1], it's targeting OpenCL rather than 
CUDA, but that might be close enough?

cheers, Frank

1. https://github.com/ochafik/ScalaCL


On Wed, Aug 27, 2014 at 7:33 PM, Wei Tan w...@us.ibm.com wrote:
Thank you all. Actually I was looking at JCUDA. Function wise this may be 
a perfect solution to offload computation to GPU. Will see how performance 
it will be, especially with the Java binding. 

Best regards, 
Wei 

- 
Wei Tan, PhD 
Research Staff Member 
IBM T. J. Watson Research Center 
http://researcher.ibm.com/person/us-wtan 



From:Chen He airb...@gmail.com 
To:Antonio Jesus Navarro ajnava...@stratio.com, 
Cc:Matei Zaharia matei.zaha...@gmail.com, user@spark.apache.org, 
Wei Tan/Watson/IBM@IBMUS 
Date:08/27/2014 11:03 AM 
Subject:Re: CUDA in spark, especially in MLlib? 




JCUDA can let you do that in Java 
http://www.jcuda.org 


On Wed, Aug 27, 2014 at 1:48 AM, Antonio Jesus Navarro 
ajnava...@stratio.com wrote: 
Maybe this would interest you: 

CPU and GPU-accelerated Machine Learning Library: 

https://github.com/BIDData/BIDMach 


2014-08-27 4:08 GMT+02:00 Matei Zaharia matei.zaha...@gmail.com: 

You should try to find a Java-based library, then you can call it from 
Scala. 

Matei 
On August 26, 2014 at 6:58:11 PM, Wei Tan (w...@us.ibm.com) wrote: 
Hi I am trying to find a CUDA library in Scala, to see if some matrix 
manipulation in MLlib can be sped up.

I googled a few but found no active projects on Scala+CUDA. Python is 
supported by CUDA though. Any suggestion on whether this idea makes any 
sense?

Best regards,
Wei






-- 
Amsterdam - Oosteinde 11, 1017 WT Amsterdam
Boston - 1 Broadway, Cambridge, MA 02142

US +1 877 414 4776 (toll free)
Europe +31(0)20 522 4466
www.onehippo.com 


Re: CUDA in spark, especially in MLlib?

2014-08-27 Thread Wei Tan
Thank you all. Actually I was looking at JCUDA. Function wise this may be 
a perfect solution to offload computation to GPU. Will see how performance 
it will be, especially with the Java binding.

Best regards,
Wei

-
Wei Tan, PhD
Research Staff Member
IBM T. J. Watson Research Center
http://researcher.ibm.com/person/us-wtan



From:   Chen He airb...@gmail.com
To: Antonio Jesus Navarro ajnava...@stratio.com, 
Cc: Matei Zaharia matei.zaha...@gmail.com, user@spark.apache.org, 
Wei Tan/Watson/IBM@IBMUS
Date:   08/27/2014 11:03 AM
Subject:Re: CUDA in spark, especially in MLlib?



JCUDA can let you do that in Java
http://www.jcuda.org


On Wed, Aug 27, 2014 at 1:48 AM, Antonio Jesus Navarro 
ajnava...@stratio.com wrote:
Maybe this would interest you:

CPU and GPU-accelerated Machine Learning Library:

https://github.com/BIDData/BIDMach


2014-08-27 4:08 GMT+02:00 Matei Zaharia matei.zaha...@gmail.com:

You should try to find a Java-based library, then you can call it from 
Scala.

Matei

On August 26, 2014 at 6:58:11 PM, Wei Tan (w...@us.ibm.com) wrote:
Hi I am trying to find a CUDA library in Scala, to see if some matrix 
manipulation in MLlib can be sped up.

I googled a few but found no active projects on Scala+CUDA. Python is 
supported by CUDA though. Any suggestion on whether this idea makes any 
sense?

Best regards,
Wei





CUDA in spark, especially in MLlib?

2014-08-26 Thread Wei Tan
Hi I am trying to find a CUDA library in Scala, to see if some matrix 
manipulation in MLlib can be sped up.

I googled a few but found no active projects on Scala+CUDA. Python is 
supported by CUDA though. Any suggestion on whether this idea makes any 
sense?

Best regards,
Wei



Re: MLLib: implementing ALS with distributed matrix

2014-08-17 Thread Wei Tan
Hi Xiangrui, yes I was not clear in my previous posting. You did 
optimization in block-level (which is brilliant!) so that blocks are 
joined first to avoid redundant data transfer.

I have two follow-up questions:

when you do rdd_a.join(rdd_b), which site this join will be done? Say, if 
sizeOf(rdd_a)sizeOf(rdd_b) then Spark moves rdd_b to rdd_a (in a per 
block manner) and do the join?
for matrix factorization, there exist some distributed SGD algorithms such 
as: http://people.cs.umass.edu/~boduo/publications/2013EDBT-sparkler.pdf . 
I plan to do some performance comparison recently. Any idea on which 
method is better?


Thanks!

Wei

-
Wei Tan, PhD
Research Staff Member
IBM T. J. Watson Research Center
http://researcher.ibm.com/person/us-wtan



From:   Xiangrui Meng men...@gmail.com
To: Wei Tan/Watson/IBM@IBMUS, 
Cc: user@spark.apache.org user@spark.apache.org
Date:   08/04/2014 12:51 AM
Subject:Re: MLLib: implementing ALS with distributed matrix



To be precise, the optimization is not `get all products that are
related to this user` but `get all products that are related to users
inside this block`. So a product factor won't be sent to the same
block more than once. We considered using GraphX to implement ALS,
which is much easier to understand. Right now, GraphX doesn't support
the optimization we use in ALS. But we are definitely looking for
simplification of MLlib's ALS code. If you have some good ideas,
please create a JIRA and we can move our discussion there.

Best,
Xiangrui

On Sun, Aug 3, 2014 at 8:39 PM, Wei Tan w...@us.ibm.com wrote:
 Hi,

   I wrote my centralized ALS implementation, and read the distributed
 implementation in MLlib. It uses InLink and OutLink to implement 
functions
 like get all products which are related to this user, and ultimately
 achieves model distribution.

   If we have a distributed matrix lib, the complex InLink and OutLink 
logic
 can be relatively easily achieved with matrix select-row or 
select-column
 operators. With this InLink and OutLink based implementation, the
 distributed code is quite different and more complex than the 
centralized
 one.

   I have a question, could we move this complexity (InLink and OutLink) 
to a
 lower distributed matrix manipulation layer, leaving the upper layer ALS
 algorithm similar to a centralized one? To be more specific, if we can
 make a DoubleMatrix a RDD, optimize the distributed manipulation of it, 
we
 can make ALS algorithm easier to implement.

   Does it make any sense?

   Best regards,
 Wei

 -
 Wei Tan, PhD
 Research Staff Member
 IBM T. J. Watson Research Center
 http://researcher.ibm.com/person/us-wtan

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




Re: MLLib: implementing ALS with distributed matrix

2014-08-17 Thread Wei Tan
Hi Deb, thanks for sharing your result. Please find my comments inline in 
blue.
Best regards,
Wei




From:   Debasish Das debasish.da...@gmail.com
To: Wei Tan/Watson/IBM@IBMUS, 
Cc: Xiangrui Meng men...@gmail.com, user@spark.apache.org 
user@spark.apache.org
Date:   08/17/2014 08:15 PM
Subject:Re: MLLib: implementing ALS with distributed matrix



Hi Wei,

Sparkler code was not available for benchmarking and so I picked up 
Jellyfish which uses SGD and if you look at the paper, the ideas are very 
similar to sparkler paper but Jellyfish is on shared memory and uses C 
code while sparkler was built on top of spark...Jellyfish used some 
interesting regularization like L1 ball projection and in-fact the paper 
produced the best rmse for netflix dataset using these ideas...

Jellyfish was a good runtime baseline to compare against...Of course the 
issue with shared memory is scalability...

I know the authors of Sparkler and I may be able to get the code. The 
issue is that it needs a change to Spark by adding the CM, which makes the 
approach less appealing. For performance numbers, it seems that their 
run-time on a 40-machine cluster is similar to what I got using MLlib ALS, 
in a single 32-core machine. But this comparison is very rough and not so 
scientific -- just for your information.

For a very brief back-of-envelop comparison (and correct me if I am 
wrong), it seems that Sparkler does not need to shape the rating matrix R 
in two ways: R and R'. Also it involves fewer data movement in each epoch 
-- since each product block only needs to move once (as a whole) to one 
destination user block. In ALS each product block needs to be split and 
moved to many user blocks. So I wonder why it does not provide a better 
performance over ALS. 
 
Sparkler was written when the ALS version that we have right now in mllib 
did not exist. The blocking idea used in Sparkler is not very different 
than what we have in ALS right now. But Sparkler used a faster 
checkpointing method (they called it Carousal Map) which is equally 
applicable to mllib ALS. Right now we checkpoint to hdfs but an efficient 
hot distributed cache (like memcached, redis or cassandra) will help 
improve the runtime further but will bring additional dependencies...right 
now mllib assumes hdfs and tachyon as checkpoint solution...

In my experiments I found distributed spark ALS on 16 cores (8 nodes, each 
running 2 cores) to be around 2x slower compared to Jellyfish SGD also 
running on 16 cores in shared memory. The biggest machine we had is 16 
core. Note that we did factor 20M users and 3M product ranges with ~400M 
ratings. 2x was a good number for me. I also found out spark ALS was 6-7X 
faster than Oryx (scalable version of Mahout ALS).

In terms of RMSE, SGD produced RMSE very similar to ALS in my experiments 
with Netflix dataset. On new datasets, I still run Jellyfish to make sure 
that if on some dataset, SGD produces better local minima than ALS, that's 
an motivation to implement DSGD/Sparkler...I have cleaned up some netflix 
specific optimizations from Jellyfish. I also implemented a non-negative 
variant (similar projected gradient algorithm as used in NNLS.scala) but 
due to SGD implementing a projected gradient and getting good convergence 
turned out to be difficult. If you want I can open source the code.

I have not run JellyFish yet. However, their paper mentioned a 3 min 
run-time on the Netflix data set. I also ran the Netflix data in MLLib ALS 
(rank 30, no implicit preference). ALS does not seem improve after 5 
iterations which takes 4-5 minutes. The machine has 32 cores but I 
allocate 16 to ALS. But the ALS machine is more powerful than JellyFish's 
paper -- it has 12 cores. So my result seems to be consistent with your 2x 
(JellyFish vs. MLlib ALS) number.

W.r.t. RMSE, I can never get a number 0.85, regardless of whatever 
configuration I use (I tried different ranks, lambda, and iterations). 
However JellyFish is able to achieve 0.8 RMSE. I wonder what is the RMSE 
you obtained and what ALS configuration you used.

In general, a shared memory implementation should be always faster than 
a map-reduce like implementation, in such a scale. Agree?
 
Spark ALS (and Oryx/Mahout's taste) represented the problem as least 
squares. For explicit no broadcast is needed and for implicit a global 
gram matrix is computed with rank x rank broadcast...if ranks are low, it 
is not a big issue. The way to represent the problem as least square has 
other benefits as well since we can map least square to a quadratic 
program and use the following ideas:

https://issues.apache.org/jira/browse/SPARK-2426

We are doing much thorough analysis within our Spark ALS variants with 
Jellyfish on shared memory...I will point to you when the results are 
available..

I read though this thread. You already did a lot of implementation on 
this. Thanks for your sharing and I will spend some time reading this.

I feel

RE: executor-cores vs. num-executors

2014-07-16 Thread Wei Tan
Thanks for sharing your experience. I got the same experience -- multiple 
moderate JVMs beat a single huge JVM.

Besides the minor JVM starting overhead, is it always better to have 
multiple JVMs rather than a single one?

Best regards,
Wei

-
Wei Tan, PhD
Research Staff Member
IBM T. J. Watson Research Center
http://researcher.ibm.com/person/us-wtan



From:   innowireless TaeYun Kim taeyun@innowireless.co.kr
To: user@spark.apache.org, 
Date:   07/16/2014 05:04 AM
Subject:RE: executor-cores vs. num-executors



Thanks.
 
Really, now I compare a stage data of the two jobs, ‘core7-exec3’ spends 
about 12.5 minutes more than ‘core2-exec12’ on GC.
 
From: Nishkam Ravi [mailto:nr...@cloudera.com] 
Sent: Wednesday, July 16, 2014 5:28 PM
To: user@spark.apache.org
Subject: Re: executor-cores vs. num-executors
 
I think two small JVMs would often beat a large one due to lower GC 
overhead. 



parallel stages?

2014-07-15 Thread Wei Tan
Hi, I wonder if I do wordcount on two different files, like this:

val file1 = sc.textFile(/...)
val file2 = sc.textFile(/...)


val wc1= file.flatMap(..).reduceByKey(_ + _,1)
val wc2= file.flatMap(...).reduceByKey(_ + _,1)

wc1.saveAsTextFile(titles.out)
wc2.saveAsTextFile(tables.out)

Would the two reduceByKey stages run in parallel given sufficient 
capacity?

Best regards,
Wei


-
Wei Tan, PhD
Research Staff Member
IBM T. J. Watson Research Center
http://researcher.ibm.com/person/us-wtan

Re: parallel stages?

2014-07-15 Thread Wei Tan
Thanks Sean. In Oozie you can use fork-join, however using Oozie to drive 
Spark jobs, jobs will not be able to share RDD (Am I right? I think 
multiple jobs submitted by Oozie will have different context).

Wonder if Spark wants to add more workflow feature in future.

Best regards,
Wei

-
Wei Tan, PhD
Research Staff Member
IBM T. J. Watson Research Center
http://researcher.ibm.com/person/us-wtan



From:   Sean Owen so...@cloudera.com
To: user@spark.apache.org, 
Date:   07/15/2014 04:37 PM
Subject:Re: parallel stages?



The last two lines are what trigger the operations, and they will each
block until the result is computed and saved. So if you execute this
code as-is, no. You could write a Scala program that invokes these two
operations in parallel, like:

Array((wc1,titles.out), (wc2,tables.out)).par.foreach { case
(wc,path) = wc.saveAsTestFile(path) }

It worked for me and think it's OK to do this if you know you want to.

On Tue, Jul 15, 2014 at 8:38 PM, Wei Tan w...@us.ibm.com wrote:
 Hi, I wonder if I do wordcount on two different files, like this:

 val file1 = sc.textFile(/...)
 val file2 = sc.textFile(/...)


 val wc1= file.flatMap(..).reduceByKey(_ + _,1)
 val wc2= file.flatMap(...).reduceByKey(_ + _,1)

 wc1.saveAsTextFile(titles.out)
 wc2.saveAsTextFile(tables.out)

 Would the two reduceByKey stages run in parallel given sufficient 
capacity?

 Best regards,
 Wei


 -
 Wei Tan, PhD
 Research Staff Member
 IBM T. J. Watson Research Center
 http://researcher.ibm.com/person/us-wtan




Re: Recommended pipeline automation tool? Oozie?

2014-07-11 Thread Wei Tan
Just curious: how about using scala to drive the workflow? I guess if you 
use other tools (oozie, etc) you lose the advantage of reading from RDD -- 
you have to read from HDFS.

Best regards,
Wei

-
Wei Tan, PhD
Research Staff Member
IBM T. J. Watson Research Center
http://researcher.ibm.com/person/us-wtan



From:   k.tham kevins...@gmail.com
To: u...@spark.incubator.apache.org, 
Date:   07/10/2014 01:20 PM
Subject:Recommended pipeline automation tool? Oozie?



I'm just wondering what's the general recommendation for data pipeline
automation.

Say, I want to run Spark Job A, then B, then invoke script C, then do D, 
and
if D fails, do E, and if Job A fails, send email F, etc...

It looks like Oozie might be the best choice. But I'd like some
advice/suggestions.

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Recommended-pipeline-automation-tool-Oozie-tp9319.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.




Re: rdd.cache() is not faster?

2014-06-18 Thread Wei Tan
Hi Gaurav, thanks for your pointer. The observation in the link is (at 
least qualitatively) similar to mine.

Now the question is, if I do have big data (40GB, cached size is 60GB) and 
even big memory (192 GB), I cannot benefit from RDD cache, and should 
persist on disk and leverage filesystem cache?

I will try more workers so that each JVM has a smaller heap.

Best regards,
Wei

-
Wei Tan, PhD
Research Staff Member
IBM T. J. Watson Research Center
http://researcher.ibm.com/person/us-wtan



From:   Gaurav Jain ja...@student.ethz.ch
To: u...@spark.incubator.apache.org, 
Date:   06/18/2014 06:30 AM
Subject:Re: rdd.cache() is not faster?



You cannot assume that caching would always reduce the execution time,
especially if the data-set is large. It appears that if too much memory is
used for caching, then less memory is left for the actual computation
itself. There has to be a balance between the two. 

Page 33 of this thesis from KTH talks about this:
http://www.diva-portal.org/smash/get/diva2:605106/FULLTEXT01.pdf

Best



-
Gaurav Jain
Master's Student, D-INFK
ETH Zurich
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/rdd-cache-is-not-faster-tp7804p7835.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.




rdd.cache() is not faster?

2014-06-17 Thread Wei Tan
Hi, I have a 40G file which is a concatenation of multiple documents, I 
want to extract two features (title and tables) from each doc, so the 
program is like this:

-
val file = sc.textFile(/path/to/40G/file)
//file.cache()   //to enable or disable cache


val titles = file.map(line = (doc_key, getTitle())  // reduce 1; here I 
use text utility functions written in Java
  {
  }).reduceByKey(_ + _,1)


val tables = file.flatMap(line = {
 
  for (table - all_tables)
yield (doc_key, getTableTitle())  // reduce 2; here I use text 
utility functions written in Java
}).reduceByKey(_ + _,1)

titles.saveAsTextFile(titles.out)   //save_1, will trigger reduce_1
tables.saveAsTextFile(tables.out) //save_2, will trigger reduce_2
-

I expect that with file.cache(), (the later) reduce_2 should be faster 
since it will read from cached data. However, results repeatedly shows 
that, reduce_2 takes 3 min when with cache and 1.4 min without cache. Why 
reading from cache does not help in this case?

Stage GUI shows that, with cache, reduce_2 always has a wave of outlier 
tasks, where the median latency is 2s but max is 1.7 min. 

Metric
Min
25th percentile
Median
75th percentile
Max
Result serialization time
0 ms
0 ms
0 ms
0 ms
1 ms
Duration
0.6 s
2 s
2 s
2 s
1.7 min

But these tasks are not with a long GC pause (26 ms as shown)

173
1210
SUCCESS
PROCESS_LOCAL
localhost
2014/06/17 17:49:43
1.7 min 
26 ms 


9.4 KB 



BTW: it is a single machine with 32 cores, 192 GB RAM, SSD, with these 
lines in spark-env.sh

SPARK_WORKER_MEMORY=180g
SPARK_MEM=180g
SPARK_JAVA_OPTS=-XX:+UseG1GC -XX:MaxGCPauseMillis=500 
-XX:MaxPermSize=256m


Thanks,

Wei

-
Wei Tan, PhD
Research Staff Member
IBM T. J. Watson Research Center
http://researcher.ibm.com/person/us-wtan

Re: long GC pause during file.cache()

2014-06-16 Thread Wei Tan
Thanks you all for advice including (1) using CMS GC (2) use multiple 
worker instance and (3) use Tachyon.

I will try (1) and (2) first and report back what I found.

I will also try JDK 7 with G1 GC.

Best regards,
Wei

-
Wei Tan, PhD
Research Staff Member
IBM T. J. Watson Research Center
http://researcher.ibm.com/person/us-wtan



From:   Aaron Davidson ilike...@gmail.com
To: user@spark.apache.org, 
Date:   06/15/2014 09:06 PM
Subject:Re: long GC pause during file.cache()



Note also that Java does not work well with very large JVMs due to this 
exact issue. There are two commonly used workarounds:

1) Spawn multiple (smaller) executors on the same machine. This can be 
done by creating multiple Workers (via SPARK_WORKER_INSTANCES in 
standalone mode[1]).
2) Use Tachyon for off-heap caching of RDDs, allowing Spark executors to 
be smaller and avoid GC pauses

[1] See standalone documentation here: 
http://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts


On Sun, Jun 15, 2014 at 3:50 PM, Nan Zhu zhunanmcg...@gmail.com wrote:
Yes, I think in the spark-env.sh.template, it is listed in the comments 
(didn’t check….) 

Best,

-- 
Nan Zhu

On Sunday, June 15, 2014 at 5:21 PM, Surendranauth Hiraman wrote:
Is SPARK_DAEMON_JAVA_OPTS valid in 1.0.0?



On Sun, Jun 15, 2014 at 4:59 PM, Nan Zhu zhunanmcg...@gmail.com wrote:
SPARK_JAVA_OPTS is deprecated in 1.0, though it works fine if you 
don’t mind the WARNING in the logs

you can set spark.executor.extraJavaOpts in your SparkConf obj

Best,

-- 
Nan Zhu

On Sunday, June 15, 2014 at 12:13 PM, Hao Wang wrote:
Hi, Wei

You may try to set JVM opts in spark-env.sh as follow to prevent or 
mitigate GC pause:

export SPARK_JAVA_OPTS=-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC 
-Xmx2g -XX:MaxPermSize=256m

There are more options you could add, please just Google :) 


Regards,
Wang Hao(王灏)

CloudTeam | School of Software Engineering
Shanghai Jiao Tong University
Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
Email:wh.s...@gmail.com


On Sun, Jun 15, 2014 at 10:24 AM, Wei Tan w...@us.ibm.com wrote:
Hi, 

  I have a single node (192G RAM) stand-alone spark, with memory 
configuration like this in spark-env.sh 

SPARK_WORKER_MEMORY=180g 
SPARK_MEM=180g 


 In spark-shell I have a program like this: 

val file = sc.textFile(/localpath) //file size is 40G 
file.cache() 


val output = file.map(line = extract something from line) 

output.saveAsTextFile (...) 


When I run this program again and again, or keep trying file.unpersist() 
-- file.cache() -- output.saveAsTextFile(), the run time varies a lot, 
from 1 min to 3 min to 50+ min. Whenever the run-time is more than 1 min, 
from the stage monitoring GUI I observe big GC pause (some can be 10+ 
min). Of course when run-time is normal, say ~1 min, no significant GC 
is observed. The behavior seems somewhat random. 

Is there any JVM tuning I should do to prevent this long GC pause from 
happening? 



I used java-1.6.0-openjdk.x86_64, and my spark-shell process is something 
like this: 

root 10994  1.7  0.6 196378000 1361496 pts/51 Sl+ 22:06   0:12 
/usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp 
::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar
 
-XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g 
org.apache.spark.deploy.SparkSubmit spark-shell --class 
org.apache.spark.repl.Main 

Best regards, 
Wei 

- 
Wei Tan, PhD 
Research Staff Member 
IBM T. J. Watson Research Center 
http://researcher.ibm.com/person/us-wtan





-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hira...@velos.io
W: www.velos.io






Re: long GC pause during file.cache()

2014-06-16 Thread Wei Tan
BTW: nowadays a single machine with huge RAM (200G to 1T) is really 
common. With virtualization you lose some performance. It would be ideal 
to see some best practice on how to use Spark in these state-of-art 
machines...

Best regards,
Wei

-
Wei Tan, PhD
Research Staff Member
IBM T. J. Watson Research Center
http://researcher.ibm.com/person/us-wtan



From:   Wei Tan/Watson/IBM@IBMUS
To: user@spark.apache.org, 
Date:   06/16/2014 10:47 AM
Subject:Re: long GC pause during file.cache()



Thanks you all for advice including (1) using CMS GC (2) use multiple 
worker instance and (3) use Tachyon. 

I will try (1) and (2) first and report back what I found. 

I will also try JDK 7 with G1 GC. 

Best regards, 
Wei 

- 
Wei Tan, PhD 
Research Staff Member 
IBM T. J. Watson Research Center 
http://researcher.ibm.com/person/us-wtan 



From:Aaron Davidson ilike...@gmail.com 
To:user@spark.apache.org, 
Date:06/15/2014 09:06 PM 
Subject:Re: long GC pause during file.cache() 



Note also that Java does not work well with very large JVMs due to this 
exact issue. There are two commonly used workarounds: 

1) Spawn multiple (smaller) executors on the same machine. This can be 
done by creating multiple Workers (via SPARK_WORKER_INSTANCES in 
standalone mode[1]). 
2) Use Tachyon for off-heap caching of RDDs, allowing Spark executors to 
be smaller and avoid GC pauses 

[1] See standalone documentation here: 
http://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts
 



On Sun, Jun 15, 2014 at 3:50 PM, Nan Zhu zhunanmcg...@gmail.com wrote: 
Yes, I think in the spark-env.sh.template, it is listed in the comments 
(didn’t check….) 

Best, 

--  
Nan Zhu 
On Sunday, June 15, 2014 at 5:21 PM, Surendranauth Hiraman wrote: 
Is SPARK_DAEMON_JAVA_OPTS valid in 1.0.0? 



On Sun, Jun 15, 2014 at 4:59 PM, Nan Zhu zhunanmcg...@gmail.com wrote: 
SPARK_JAVA_OPTS is deprecated in 1.0, though it works fine if you don’t 
mind the WARNING in the logs 

you can set spark.executor.extraJavaOpts in your SparkConf obj 

Best, 

-- 
Nan Zhu 
On Sunday, June 15, 2014 at 12:13 PM, Hao Wang wrote: 
Hi, Wei 

You may try to set JVM opts in spark-env.sh as follow to prevent or 
mitigate GC pause: 

export SPARK_JAVA_OPTS=-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC 
-Xmx2g -XX:MaxPermSize=256m 

There are more options you could add, please just Google :) 


Regards, 
Wang Hao(王灏) 

CloudTeam | School of Software Engineering 
Shanghai Jiao Tong University 
Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 
Email:wh.s...@gmail.com 


On Sun, Jun 15, 2014 at 10:24 AM, Wei Tan w...@us.ibm.com wrote: 
Hi, 

  I have a single node (192G RAM) stand-alone spark, with memory 
configuration like this in spark-env.sh 

SPARK_WORKER_MEMORY=180g 
SPARK_MEM=180g 


 In spark-shell I have a program like this: 

val file = sc.textFile(/localpath) //file size is 40G 
file.cache() 


val output = file.map(line = extract something from line) 

output.saveAsTextFile (...) 


When I run this program again and again, or keep trying file.unpersist() 
-- file.cache() -- output.saveAsTextFile(), the run time varies a lot, 
from 1 min to 3 min to 50+ min. Whenever the run-time is more than 1 min, 
from the stage monitoring GUI I observe big GC pause (some can be 10+ 
min). Of course when run-time is normal, say ~1 min, no significant GC 
is observed. The behavior seems somewhat random. 

Is there any JVM tuning I should do to prevent this long GC pause from 
happening? 



I used java-1.6.0-openjdk.x86_64, and my spark-shell process is something 
like this: 

root 10994  1.7  0.6 196378000 1361496 pts/51 Sl+ 22:06   0:12 
/usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp 
::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar
 
-XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g 
org.apache.spark.deploy.SparkSubmit spark-shell --class 
org.apache.spark.repl.Main 

Best regards, 
Wei 

- 
Wei Tan, PhD 
Research Staff Member 
IBM T. J. Watson Research Center 
http://researcher.ibm.com/person/us-wtan 





-- 
 
SUREN HIRAMAN, VP TECHNOLOGY
Velos 
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105 
F: 646.349.4063
E: suren.hira...@velos.io
W: www.velos.io






long GC pause during file.cache()

2014-06-14 Thread Wei Tan
Hi,

  I have a single node (192G RAM) stand-alone spark, with memory 
configuration like this in spark-env.sh

SPARK_WORKER_MEMORY=180g
SPARK_MEM=180g


 In spark-shell I have a program like this:

val file = sc.textFile(/localpath) //file size is 40G
file.cache()


val output = file.map(line = extract something from line)

output.saveAsTextFile (...)


When I run this program again and again, or keep trying file.unpersist() 
-- file.cache() -- output.saveAsTextFile(), the run time varies a lot, 
from 1 min to 3 min to 50+ min. Whenever the run-time is more than 1 min, 
from the stage monitoring GUI I observe big GC pause (some can be 10+ 
min). Of course when run-time is normal, say ~1 min, no significant GC 
is observed. The behavior seems somewhat random.

Is there any JVM tuning I should do to prevent this long GC pause from 
happening? 



I used java-1.6.0-openjdk.x86_64, and my spark-shell process is something 
like this:

root 10994  1.7  0.6 196378000 1361496 pts/51 Sl+ 22:06   0:12 
/usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp 
::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar
 
-XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g 
org.apache.spark.deploy.SparkSubmit spark-shell --class 
org.apache.spark.repl.Main

Best regards,
Wei

-
Wei Tan, PhD
Research Staff Member
IBM T. J. Watson Research Center
http://researcher.ibm.com/person/us-wtan

Re: How to compile a Spark project in Scala IDE for Eclipse?

2014-06-08 Thread Wei Tan
This will make the compilation pass but you may not be able to run it 
correctly.

I used maven adding these two jars (I use Hadoop 1), maven added their 
dependent jars (a lot) for me.

dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.0.0/version
/dependency
dependency
groupIdorg.apache.hadoop/groupId
artifactIdhadoop-client/artifactId
version1.2.1/version
/dependency



Best regards,
Wei 

-
Wei Tan, PhD
Research Staff Member
IBM T. J. Watson Research Center
http://researcher.ibm.com/person/us-wtan



From:   Krishna Sankar ksanka...@gmail.com
To: user@spark.apache.org, 
Date:   06/08/2014 11:19 AM
Subject:Re: How to compile a Spark project in Scala IDE for 
Eclipse?



Project-Properties-Java Build Path-Add External Jars
Add the /spark-1.0.0-bin-hadoop2/lib/spark-assembly-1.0.0-hadoop2.2.0.jar
Cheers
K/


On Sun, Jun 8, 2014 at 8:06 AM, Carter gyz...@hotmail.com wrote:
Hi All,

I just downloaded the Scala IDE for Eclipse. After I created a Spark 
project
and clicked Run there was an error on this line of code import
org.apache.spark.SparkContext: object apache is not a member of package
org. I guess I need to import the Spark dependency into Scala IDE for
Eclipse, can anyone tell me how to do it? Thanks a lot.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-compile-a-Spark-project-in-Scala-IDE-for-Eclipse-tp7197.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.



best practice: write and debug Spark application in scala-ide and maven

2014-06-06 Thread Wei Tan
Hi,

  I am trying to write and debug Spark applications in scala-ide and 
maven, and in my code I target at a Spark instance at spark://xxx

object App {
 
 
  def main(args : Array[String]) {
println( Hello World! )
val sparkConf = new 
SparkConf().setMaster(spark://xxx:7077).setAppName(WordCount)
 
val spark = new SparkContext(sparkConf)
val file = spark.textFile(hdfs://xxx:9000/wcinput/pg1184.txt)
val counts = file.flatMap(line = line.split( ))
 .map(word = (word, 1))
 .reduceByKey(_ + _)
counts.saveAsTextFile(hdfs://flex05.watson.ibm.com:9000/wcoutput) 
  }

}

I added spark-core and hadoop-client in maven dependency so the code 
compiles fine.

When I click run in Eclipse I got this error:

14/06/06 20:52:18 WARN scheduler.TaskSetManager: Loss was due to 
java.lang.ClassNotFoundException
java.lang.ClassNotFoundException: samples.App$$anonfun$2

I googled this error and it seems that I need to package my code into a 
jar file and push it to spark nodes. But since I am debugging the code, it 
would be handy if I can quickly see results without packaging and 
uploading jars.

What is the best practice of writing a spark application (like wordcount) 
and debug quickly on a remote spark instance?

Thanks!
Wei


-
Wei Tan, PhD
Research Staff Member
IBM T. J. Watson Research Center
http://researcher.ibm.com/person/us-wtan

Re: reuse hadoop code in Spark

2014-06-05 Thread Wei Tan
Thanks Matei.

Using your pointers I can import data frrom HDFS, what I want to do now is 
something like this in Spark:

---
import myown.mapper

rdd.map (mapper.map)
---

The reason why I want this: myown.mapper is a java class I already 
developed. I used to run it in Hadoop. It is fairly complex and relies on 
a lot of utility java classes I wrote. Can I reuse the map function in 
java and port it into Spark?

Best regards,
Wei


-
Wei Tan, PhD
Research Staff Member
IBM T. J. Watson Research Center
http://researcher.ibm.com/person/us-wtan



From:   Matei Zaharia matei.zaha...@gmail.com
To: user@spark.apache.org, 
Date:   06/04/2014 04:28 PM
Subject:Re: reuse hadoop code in Spark



Yes, you can write some glue in Spark to call these. Some functions to 
look at:

- SparkContext.hadoopRDD lets you create an input RDD from an existing 
JobConf configured by Hadoop (including InputFormat, paths, etc)
- RDD.mapPartitions lets you operate in all the values on one partition 
(block) at a time, similar to how Mappers in MapReduce work
- PairRDDFunctions.reduceByKey and groupByKey can be used for aggregation.
- RDD.pipe() can be used to call out to a script or binary, like Hadoop 
Streaming.

A fair number of people have been running both Java and Hadoop Streaming 
apps like this.

Matei

On Jun 4, 2014, at 1:08 PM, Wei Tan w...@us.ibm.com wrote:

Hello, 

  I am trying to use spark in such a scenario: 

  I have code written in Hadoop and now I try to migrate to Spark. The 
mappers and reducers are fairly complex. So I wonder if I can reuse the 
map() functions I already wrote in Hadoop (Java), and use Spark to chain 
them, mixing the Java map() functions with Spark operators? 

  Another related question, can I use binary as operators, like Hadoop 
streaming? 

  Thanks! 
Wei 

 



reuse hadoop code in Spark

2014-06-04 Thread Wei Tan
Hello,

  I am trying to use spark in such a scenario:

  I have code written in Hadoop and now I try to migrate to Spark. The 
mappers and reducers are fairly complex. So I wonder if I can reuse the 
map() functions I already wrote in Hadoop (Java), and use Spark to chain 
them, mixing the Java map() functions with Spark operators?

  Another related question, can I use binary as operators, like Hadoop 
streaming?

  Thanks!
Wei