Re: GroupByKey results in OOM - Any other alternative

2014-06-15 Thread Surendranauth Hiraman
Vivek,

If the foldByKey solution doesn't work for you, my team uses
RDD.persist(DISK_ONLY) to avoid OOM errors.

It's slower, of course, and requires tuning other config parameters. It can
also be a problem if you do not have enough disk space, meaning that you
have to unpersist at the right points if you are running long flows.

For us, even though the disk writes are a performance hit, we prefer the
Spark programming model to Hadoop M/R. But we are still working on getting
this to work end to end on 100s of GB of data on our 16-node cluster.

Suren



On Sun, Jun 15, 2014 at 12:08 AM, Vivek YS vivek...@gmail.com wrote:

 Thanks for the input. I will give foldByKey a shot.

 The way I am doing is, data is partitioned hourly. So I am computing
 distinct values hourly. Then I use unionRDD to merge them and compute
 distinct on the overall data.

  Is there a way to know which key,value pair is resulting in the OOM ?
  Is there a way to set parallelism in the map stage so that, each worker
 will process one key at time. ?

 I didn't realise countApproxDistinctByKey is using hyperloglogplus. This
 should be interesting.

 --Vivek


 On Sat, Jun 14, 2014 at 11:37 PM, Sean Owen so...@cloudera.com wrote:

 Grouping by key is always problematic since a key might have a huge
 number of values. You can do a little better than grouping *all* values and
 *then* finding distinct values by using foldByKey, putting values into a
 Set. At least you end up with only distinct values in memory. (You don't
 need two maps either, right?)

 If the number of distinct values is still huge for some keys, consider
 the experimental method countApproxDistinctByKey:
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L285

 This should be much more performant at the cost of some accuracy.


 On Sat, Jun 14, 2014 at 1:58 PM, Vivek YS vivek...@gmail.com wrote:

 Hi,
For last couple of days I have been trying hard to get around this
 problem. Please share any insights on solving this problem.

 Problem :
 There is a huge list of (key, value) pairs. I want to transform this to
 (key, distinct values) and then eventually to (key, distinct values count)

 On small dataset

 groupByKey().map( x = (x_1, x._2.distinct)) ...map(x = (x_1,
 x._2.distinct.count))

 On large data set I am getting OOM.

 Is there a way to represent Seq of values from groupByKey as RDD and
 then perform distinct over it ?

 Thanks
 Vivek






-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Using custom class as a key for groupByKey() or reduceByKey()

2014-06-15 Thread Gaurav Jain
I have a simple Java class as follows, that I want to use as a key while
applying groupByKey or reduceByKey functions:

private static class FlowId {
public String dcxId;
public String trxId;
public String msgType;

public FlowId(String dcxId, String trxId, String msgType) {
this.dcxId = dcxId;
this.trxId = trxId;
this.msgType = msgType;
}

public boolean equals(Object other) {
if (other == this) return true;
if (other == null) return false;
if (getClass() != other.getClass()) return false;
FlowId fid = (FlowId) other;
if (this.dcxId.equals(fid.dcxId)  
this.trxId.equals(fid.trxId) 
this.msgType.equals(fid.msgType)) {
return true;
}
return false;
}
}

I figured that an equals() method would need to be overridden to ensure
comparison of keys, but still entries with the same key are listed
separately after applying a groupByKey(), for example. What further
modifications are necessary to enable usage of above class as a key. Right
now, I have fallen back to using Tuple3String, String, String instead of
the FlowId class, but it makes the code unnecessarily verbose.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-custom-class-as-a-key-for-groupByKey-or-reduceByKey-tp7640.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Using custom class as a key for groupByKey() or reduceByKey()

2014-06-15 Thread Sean Owen
In Java at large, you must always implement hashCode() when you implement
equals(). This is not specific to Spark. This is to maintain the contract
that two equals() instances have the same hash code, and that's not the
case for your class now. This causes weird things to happen wherever the
hash code contract is depended upon.

This probably works fine:

@Override
public int hashCode() {
  return dcxId.hashCode() ^ trxId.hashCode() ^ msgType.hashCode();
}


On Sun, Jun 15, 2014 at 11:45 AM, Gaurav Jain ja...@student.ethz.ch wrote:

 I have a simple Java class as follows, that I want to use as a key while
 applying groupByKey or reduceByKey functions:

 private static class FlowId {
 public String dcxId;
 public String trxId;
 public String msgType;

 public FlowId(String dcxId, String trxId, String msgType) {
 this.dcxId = dcxId;
 this.trxId = trxId;
 this.msgType = msgType;
 }

 public boolean equals(Object other) {
 if (other == this) return true;
 if (other == null) return false;
 if (getClass() != other.getClass()) return false;
 FlowId fid = (FlowId) other;
 if (this.dcxId.equals(fid.dcxId) 
 this.trxId.equals(fid.trxId) 
 this.msgType.equals(fid.msgType)) {
 return true;
 }
 return false;
 }
 }

 I figured that an equals() method would need to be overridden to ensure
 comparison of keys, but still entries with the same key are listed
 separately after applying a groupByKey(), for example. What further
 modifications are necessary to enable usage of above class as a key. Right
 now, I have fallen back to using Tuple3String, String, String instead of
 the FlowId class, but it makes the code unnecessarily verbose.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Using-custom-class-as-a-key-for-groupByKey-or-reduceByKey-tp7640.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: long GC pause during file.cache()

2014-06-15 Thread Hao Wang
Hi, Wei

You may try to set JVM opts in *spark-env.sh* as follow to prevent or
mitigate GC pause:

export SPARK_JAVA_OPTS=-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC
-Xmx2g -XX:MaxPermSize=256m

There are more options you could add, please just Google :)


Regards,
Wang Hao(王灏)

CloudTeam | School of Software Engineering
Shanghai Jiao Tong University
Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
Email:wh.s...@gmail.com


On Sun, Jun 15, 2014 at 10:24 AM, Wei Tan w...@us.ibm.com wrote:

 Hi,

   I have a single node (192G RAM) stand-alone spark, with memory
 configuration like this in spark-env.sh

 SPARK_WORKER_MEMORY=180g
 SPARK_MEM=180g


  In spark-shell I have a program like this:

 val file = sc.textFile(/localpath) //file size is 40G
 file.cache()


 val output = file.map(line = extract something from line)

 output.saveAsTextFile (...)


 When I run this program again and again, or keep trying file.unpersist()
 -- file.cache() -- output.saveAsTextFile(), the run time varies a lot,
 from 1 min to 3 min to 50+ min. Whenever the run-time is more than 1 min,
 from the stage monitoring GUI I observe big GC pause (some can be 10+ min).
 Of course when run-time is normal, say ~1 min, no significant GC is
 observed. The behavior seems somewhat random.

 Is there any JVM tuning I should do to prevent this long GC pause from
 happening?



 I used java-1.6.0-openjdk.x86_64, and my spark-shell process is something
 like this:

 root 10994  1.7  0.6 196378000 1361496 pts/51 Sl+ 22:06   0:12
 /usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp
 ::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar
 -XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g
 org.apache.spark.deploy.SparkSubmit spark-shell --class
 org.apache.spark.repl.Main

 Best regards,
 Wei

 -
 Wei Tan, PhD
 Research Staff Member
 IBM T. J. Watson Research Center
 *http://researcher.ibm.com/person/us-wtan*
 http://researcher.ibm.com/person/us-wtan


Re: long GC pause during file.cache()

2014-06-15 Thread Nan Zhu
SPARK_JAVA_OPTS is deprecated in 1.0, though it works fine if you don’t mind 
the WARNING in the logs

you can set spark.executor.extraJavaOpts in your SparkConf obj  

Best,

--  
Nan Zhu


On Sunday, June 15, 2014 at 12:13 PM, Hao Wang wrote:

 Hi, Wei
  
 You may try to set JVM opts in spark-env.sh (http://spark-env.sh) as follow 
 to prevent or mitigate GC pause:
  
 export SPARK_JAVA_OPTS=-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC 
 -Xmx2g -XX:MaxPermSize=256m
  
 There are more options you could add, please just Google :)  
  
  
 Regards,
 Wang Hao(王灏)
  
 CloudTeam | School of Software Engineering
 Shanghai Jiao Tong University
 Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
 Email:wh.s...@gmail.com (mailto:wh.s...@gmail.com)
  
  
  
  
  
  
 On Sun, Jun 15, 2014 at 10:24 AM, Wei Tan w...@us.ibm.com 
 (mailto:w...@us.ibm.com) wrote:
  Hi,  
   
I have a single node (192G RAM) stand-alone spark, with memory 
  configuration like this in spark-env.sh (http://spark-env.sh)  
   
  SPARK_WORKER_MEMORY=180g  
  SPARK_MEM=180g  
   
   
   In spark-shell I have a program like this:  
   
  val file = sc.textFile(/localpath) //file size is 40G  
  file.cache()  
   
   
  val output = file.map(line = extract something from line)  
   
  output.saveAsTextFile (...)  
   
   
  When I run this program again and again, or keep trying file.unpersist() 
  -- file.cache() -- output.saveAsTextFile(), the run time varies a lot, 
  from 1 min to 3 min to 50+ min. Whenever the run-time is more than 1 min, 
  from the stage monitoring GUI I observe big GC pause (some can be 10+ min). 
  Of course when run-time is normal, say ~1 min, no significant GC is 
  observed. The behavior seems somewhat random.  
   
  Is there any JVM tuning I should do to prevent this long GC pause from 
  happening?  
   
   
   
  I used java-1.6.0-openjdk.x86_64, and my spark-shell process is something 
  like this:  
   
  root 10994  1.7  0.6 196378000 1361496 pts/51 Sl+ 22:06   0:12 
  /usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp 
  ::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar
   -XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g 
  org.apache.spark.deploy.SparkSubmit spark-shell --class 
  org.apache.spark.repl.Main  
   
  Best regards,  
  Wei  
   
  -  
  Wei Tan, PhD  
  Research Staff Member  
  IBM T. J. Watson Research Center  
  http://researcher.ibm.com/person/us-wtan



Re: MLLib : Decision Tree not getting built for 5 or more levels(maxDepth=5) and the one built for 3 levels is performing poorly

2014-06-15 Thread Manish Amde
Hi Suraj,

I don't see any logs from mllib. You might need to explicit set the logging
to DEBUG for mllib. Adding this line for log4j.properties might fix the
problem.
log4j.logger.org.apache.spark.mllib.tree=DEBUG

Also, please let me know if you can encounter similar problems with the
Spark master.

-Manish


On Sat, Jun 14, 2014 at 3:19 AM, SURAJ SHETH shet...@gmail.com wrote:

 Hi Manish,
 Thanks for your reply.

 I am attaching the logs here(regression, 5 levels). It contains the last
 100s of lines. Also, I am attaching the screenshot of Spark UI. The first 4
 levels complete in less than 6 seconds, while the 5th level doesn't
 complete even after several hours.
 Due to the reason that this is somebody else's data, I can't share it.

 Can you check the code snippet attached in my first email and see if it
 needs something to enable it to work for large data and = 5 levels. It is
 working for 3 levels on the same dataset, but, not for 5 levels.

 In the mean time, I will try to run it on the latest master and let you
 know the results. If it runs fine there, then, it can be related to 128 MB
 limit issue that you mentioned.

 Thanks and Regards,
 Suraj Sheth



 On Sat, Jun 14, 2014 at 12:05 AM, Manish Amde manish...@gmail.com wrote:

 Hi Suraj,

 I can't answer 1) without knowing the data. However, the results for 2)
 are surprising indeed. We have tested with a billion samples for regression
 tasks so I am perplexed with the behavior.

 Could you try the latest Spark master to see whether this problem goes
 away. It has code that limits memory consumption at the master and worker
 nodes to 128 MB by default which ideally should not be needed given the
 amount of RAM on your cluster.

 Also, feel free to send the DEBUG logs. It might give me a better idea of
 where the algorithm is getting stuck.

 -Manish



 On Wed, Jun 11, 2014 at 1:20 PM, SURAJ SHETH shet...@gmail.com wrote:

 Hi Filipus,
 The train data is already oversampled.
 The number of positives I mentioned above is for the test dataset :
 12028 (apologies for not making this clear earlier)
 The train dataset has 61,264 positives out of 689,763 total rows. The
 number of negatives is 628,499.
 Oversampling was done for the train dataset to ensure that we have
 atleast 9-10% of positives in the train part
 No oversampling is done for the test dataset.

 So, the only difference that remains is the amount of data used for
 building a tree.

 But, I have a few more questions :
 Have we tried how much data can be used at most to build a single
 Decision Tree.
 Since, I have enough RAM to fit all the data into memory(only 1.3 GB of
 train data and 30x3 GB of RAM), I would expect it to build a single
 Decision Tree with all the data without any issues. But, for maxDepth = 5,
 it is not able to. I confirmed that when it keeps running for hours, the
 amount of free memory available is more than 70%. So, it doesn't seem to be
 a Memory issue either.


 Thanks and Regards,
 Suraj Sheth


 On Wed, Jun 11, 2014 at 10:19 PM, filipus floe...@gmail.com wrote:

 well I guess your problem is quite unbalanced and due to the information
 value as a splitting criterion I guess the algo stops after very view
 splits

 work arround is oversampling

 build many training datasets like

 take randomly 50% of the positives and from the negativ the same amount
 or
 let say the double

 = 6000 positives and 12000 negatives

 build a tree

 this you do many times = many models (agents)

 and than you make an ensemble model. means vote all the model

 in a way similar two random forest but at the completely different



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-Decision-Tree-not-getting-built-for-5-or-more-levels-maxDepth-5-and-the-one-built-for-3-levelsy-tp7401p7405.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.







Akka listens to hostname while user may spark-submit with master in IP url

2014-06-15 Thread Hao Wang
Hi, All

In Spark the spark.driver.host is driver hostname in default, thus, akka
actor system will listen to a URL like akka.tcp://hostname:port. However,
when a user tries to use spark-submit to run application, the user may set
--master spark://192.168.1.12:7077.

Then, the *AppClient* in *SparkDeploySchedulerBackend* cannot successfully
register to the Master, and the console prints:

WARN TaskSchedulerImpl: Initial job has not accepted any resources; check
your cluster UI to ensure that workers are registered and have sufficient
memory

I think we need to improve this by making akka recognises both hostname and
the corresponding IP. Or at least add lines in Spark document to limit user
from using IP. Any comments?

Regards,
Wang Hao(王灏)

CloudTeam | School of Software Engineering
Shanghai Jiao Tong University
Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
Email:wh.s...@gmail.com


Re: long GC pause during file.cache()

2014-06-15 Thread Surendranauth Hiraman
Is SPARK_DAEMON_JAVA_OPTS valid in 1.0.0?



On Sun, Jun 15, 2014 at 4:59 PM, Nan Zhu zhunanmcg...@gmail.com wrote:

  SPARK_JAVA_OPTS is deprecated in 1.0, though it works fine if you
 don’t mind the WARNING in the logs

 you can set spark.executor.extraJavaOpts in your SparkConf obj

 Best,

 --
 Nan Zhu

 On Sunday, June 15, 2014 at 12:13 PM, Hao Wang wrote:

 Hi, Wei

 You may try to set JVM opts in *spark-env.sh http://spark-env.sh* as
 follow to prevent or mitigate GC pause:

 export SPARK_JAVA_OPTS=-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC
 -Xmx2g -XX:MaxPermSize=256m

 There are more options you could add, please just Google :)


 Regards,
 Wang Hao(王灏)

 CloudTeam | School of Software Engineering
 Shanghai Jiao Tong University
 Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
 Email:wh.s...@gmail.com


 On Sun, Jun 15, 2014 at 10:24 AM, Wei Tan w...@us.ibm.com wrote:

 Hi,

   I have a single node (192G RAM) stand-alone spark, with memory
 configuration like this in spark-env.sh

 SPARK_WORKER_MEMORY=180g
 SPARK_MEM=180g


  In spark-shell I have a program like this:

 val file = sc.textFile(/localpath) //file size is 40G
 file.cache()


 val output = file.map(line = extract something from line)

 output.saveAsTextFile (...)


 When I run this program again and again, or keep trying file.unpersist()
 -- file.cache() -- output.saveAsTextFile(), the run time varies a lot,
 from 1 min to 3 min to 50+ min. Whenever the run-time is more than 1 min,
 from the stage monitoring GUI I observe big GC pause (some can be 10+ min).
 Of course when run-time is normal, say ~1 min, no significant GC is
 observed. The behavior seems somewhat random.

 Is there any JVM tuning I should do to prevent this long GC pause from
 happening?



 I used java-1.6.0-openjdk.x86_64, and my spark-shell process is something
 like this:

 root 10994  1.7  0.6 196378000 1361496 pts/51 Sl+ 22:06   0:12
 /usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp
 ::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar
 -XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g
 org.apache.spark.deploy.SparkSubmit spark-shell --class
 org.apache.spark.repl.Main

 Best regards,
 Wei

 -
 Wei Tan, PhD
 Research Staff Member
 IBM T. J. Watson Research Center
 *http://researcher.ibm.com/person/us-wtan*
 http://researcher.ibm.com/person/us-wtan






-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: long GC pause during file.cache()

2014-06-15 Thread Nan Zhu
Yes, I think in the spark-env.sh.template, it is listed in the comments (didn’t 
check….)  

Best,  

--  
Nan Zhu


On Sunday, June 15, 2014 at 5:21 PM, Surendranauth Hiraman wrote:

 Is SPARK_DAEMON_JAVA_OPTS valid in 1.0.0?
  
  
  
 On Sun, Jun 15, 2014 at 4:59 PM, Nan Zhu zhunanmcg...@gmail.com 
 (mailto:zhunanmcg...@gmail.com) wrote:
  SPARK_JAVA_OPTS is deprecated in 1.0, though it works fine if you don’t 
  mind the WARNING in the logs
   
  you can set spark.executor.extraJavaOpts in your SparkConf obj  
   
  Best,
   
  --  
  Nan Zhu
   
   
  On Sunday, June 15, 2014 at 12:13 PM, Hao Wang wrote:
   
   Hi, Wei

   You may try to set JVM opts in spark-env.sh (http://spark-env.sh) as 
   follow to prevent or mitigate GC pause:  

   export SPARK_JAVA_OPTS=-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC 
   -Xmx2g -XX:MaxPermSize=256m

   There are more options you could add, please just Google :)  


   Regards,
   Wang Hao(王灏)

   CloudTeam | School of Software Engineering
   Shanghai Jiao Tong University
   Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
   Email:wh.s...@gmail.com (mailto:wh.s...@gmail.com)






   On Sun, Jun 15, 2014 at 10:24 AM, Wei Tan w...@us.ibm.com 
   (mailto:w...@us.ibm.com) wrote:
Hi,  
 
  I have a single node (192G RAM) stand-alone spark, with memory 
configuration like this in spark-env.sh (http://spark-env.sh)  
 
SPARK_WORKER_MEMORY=180g  
SPARK_MEM=180g  
 
 
 In spark-shell I have a program like this:  
 
val file = sc.textFile(/localpath) //file size is 40G  
file.cache()  
 
 
val output = file.map(line = extract something from line)  
 
output.saveAsTextFile (...)  
 
 
When I run this program again and again, or keep trying 
file.unpersist() -- file.cache() -- output.saveAsTextFile(), the run 
time varies a lot, from 1 min to 3 min to 50+ min. Whenever the 
run-time is more than 1 min, from the stage monitoring GUI I observe 
big GC pause (some can be 10+ min). Of course when run-time is 
normal, say ~1 min, no significant GC is observed. The behavior seems 
somewhat random.  
 
Is there any JVM tuning I should do to prevent this long GC pause from 
happening?  
 
 
 
I used java-1.6.0-openjdk.x86_64, and my spark-shell process is 
something like this:  
 
root 10994  1.7  0.6 196378000 1361496 pts/51 Sl+ 22:06   0:12 
/usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp 
::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar
 -XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g 
org.apache.spark.deploy.SparkSubmit spark-shell --class 
org.apache.spark.repl.Main  
 
Best regards,  
Wei  
 
-  
Wei Tan, PhD  
Research Staff Member  
IBM T. J. Watson Research Center  
http://researcher.ibm.com/person/us-wtan
   
  
  
  
 --  
  SUREN HIRAMAN, 
 VP TECHNOLOGY
 Velos
 Accelerating Machine Learning
  
 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v (mailto:suren.hira...@sociocast.com)elos.io 
 (http://elos.io)
 W: www.velos.io (http://www.velos.io/)
  



pyspark serializer can't handle functions?

2014-06-15 Thread madeleine
It seems that the default serializer used by pyspark can't serialize a list
of functions.
I've seen some posts about trying to fix this by using dill to serialize
rather than pickle. 
Does anyone know what the status of that project is, or whether there's
another easy workaround?

I've pasted a sample error message below. Here, regs is a function defined
in another file myfile.py that has been included on all workers via the
pyFiles argument to SparkContext: sc = SparkContext(local,
myapp,pyFiles=[myfile.py]).

  File runfile.py, line 45, in __init__
regsRDD = sc.parallelize([regs]*self.n)
  File /Applications/spark-0.9.1-bin-hadoop2/python/pyspark/context.py,
line 223, in parallelize
serializer.dump_stream(c, tempFile)
  File
/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py, line
182, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
  File
/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py, line
118, in dump_stream
self._write_with_length(obj, stream)
  File
/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py, line
128, in _write_with_length
serialized = self.dumps(obj)
  File
/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py, line
270, in dumps
def dumps(self, obj): return cPickle.dumps(obj, 2)
cPickle.PicklingError: Can't pickle type 'function': attribute lookup
__builtin__.function failed



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-serializer-can-t-handle-functions-tp7650.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: GroupByKey results in OOM - Any other alternative

2014-06-15 Thread Ian O'Connell
Depending on your requirements when doing hourly metrics calculating
distinct cardinality, a much more scalable method would be to use a hyper
log log data structure.
a scala impl people have used with spark would be
https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala


On Sun, Jun 15, 2014 at 6:16 AM, Surendranauth Hiraman 
suren.hira...@velos.io wrote:

 Vivek,

 If the foldByKey solution doesn't work for you, my team uses
 RDD.persist(DISK_ONLY) to avoid OOM errors.

 It's slower, of course, and requires tuning other config parameters. It
 can also be a problem if you do not have enough disk space, meaning that
 you have to unpersist at the right points if you are running long flows.

 For us, even though the disk writes are a performance hit, we prefer the
 Spark programming model to Hadoop M/R. But we are still working on getting
 this to work end to end on 100s of GB of data on our 16-node cluster.

 Suren



 On Sun, Jun 15, 2014 at 12:08 AM, Vivek YS vivek...@gmail.com wrote:

 Thanks for the input. I will give foldByKey a shot.

 The way I am doing is, data is partitioned hourly. So I am computing
 distinct values hourly. Then I use unionRDD to merge them and compute
 distinct on the overall data.

  Is there a way to know which key,value pair is resulting in the OOM ?
  Is there a way to set parallelism in the map stage so that, each worker
 will process one key at time. ?

 I didn't realise countApproxDistinctByKey is using hyperloglogplus. This
 should be interesting.

 --Vivek


 On Sat, Jun 14, 2014 at 11:37 PM, Sean Owen so...@cloudera.com wrote:

 Grouping by key is always problematic since a key might have a huge
 number of values. You can do a little better than grouping *all* values and
 *then* finding distinct values by using foldByKey, putting values into a
 Set. At least you end up with only distinct values in memory. (You don't
 need two maps either, right?)

 If the number of distinct values is still huge for some keys, consider
 the experimental method countApproxDistinctByKey:
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L285

 This should be much more performant at the cost of some accuracy.


 On Sat, Jun 14, 2014 at 1:58 PM, Vivek YS vivek...@gmail.com wrote:

 Hi,
For last couple of days I have been trying hard to get around this
 problem. Please share any insights on solving this problem.

 Problem :
 There is a huge list of (key, value) pairs. I want to transform this to
 (key, distinct values) and then eventually to (key, distinct values count)

 On small dataset

 groupByKey().map( x = (x_1, x._2.distinct)) ...map(x = (x_1,
 x._2.distinct.count))

 On large data set I am getting OOM.

 Is there a way to represent Seq of values from groupByKey as RDD and
 then perform distinct over it ?

 Thanks
 Vivek






 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io




Re: long GC pause during file.cache()

2014-06-15 Thread Aaron Davidson
Note also that Java does not work well with very large JVMs due to this
exact issue. There are two commonly used workarounds:

1) Spawn multiple (smaller) executors on the same machine. This can be done
by creating multiple Workers (via SPARK_WORKER_INSTANCES in standalone
mode[1]).
2) Use Tachyon for off-heap caching of RDDs, allowing Spark executors to be
smaller and avoid GC pauses

[1] See standalone documentation here:
http://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts


On Sun, Jun 15, 2014 at 3:50 PM, Nan Zhu zhunanmcg...@gmail.com wrote:

  Yes, I think in the spark-env.sh.template, it is listed in the comments
 (didn’t check….)

 Best,

 --
 Nan Zhu

 On Sunday, June 15, 2014 at 5:21 PM, Surendranauth Hiraman wrote:

 Is SPARK_DAEMON_JAVA_OPTS valid in 1.0.0?



 On Sun, Jun 15, 2014 at 4:59 PM, Nan Zhu zhunanmcg...@gmail.com wrote:

  SPARK_JAVA_OPTS is deprecated in 1.0, though it works fine if you
 don’t mind the WARNING in the logs

 you can set spark.executor.extraJavaOpts in your SparkConf obj

 Best,

 --
 Nan Zhu

 On Sunday, June 15, 2014 at 12:13 PM, Hao Wang wrote:

 Hi, Wei

 You may try to set JVM opts in *spark-env.sh http://spark-env.sh* as
 follow to prevent or mitigate GC pause:

 export SPARK_JAVA_OPTS=-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC
 -Xmx2g -XX:MaxPermSize=256m

 There are more options you could add, please just Google :)


 Regards,
 Wang Hao(王灏)

 CloudTeam | School of Software Engineering
 Shanghai Jiao Tong University
 Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
 Email:wh.s...@gmail.com


 On Sun, Jun 15, 2014 at 10:24 AM, Wei Tan w...@us.ibm.com wrote:

 Hi,

   I have a single node (192G RAM) stand-alone spark, with memory
 configuration like this in spark-env.sh

 SPARK_WORKER_MEMORY=180g
 SPARK_MEM=180g


  In spark-shell I have a program like this:

 val file = sc.textFile(/localpath) //file size is 40G
 file.cache()


 val output = file.map(line = extract something from line)

 output.saveAsTextFile (...)


 When I run this program again and again, or keep trying file.unpersist()
 -- file.cache() -- output.saveAsTextFile(), the run time varies a lot,
 from 1 min to 3 min to 50+ min. Whenever the run-time is more than 1 min,
 from the stage monitoring GUI I observe big GC pause (some can be 10+ min).
 Of course when run-time is normal, say ~1 min, no significant GC is
 observed. The behavior seems somewhat random.

 Is there any JVM tuning I should do to prevent this long GC pause from
 happening?



 I used java-1.6.0-openjdk.x86_64, and my spark-shell process is something
 like this:

 root 10994  1.7  0.6 196378000 1361496 pts/51 Sl+ 22:06   0:12
 /usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp
 ::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar
 -XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g
 org.apache.spark.deploy.SparkSubmit spark-shell --class
 org.apache.spark.repl.Main

 Best regards,
 Wei

 -
 Wei Tan, PhD
 Research Staff Member
 IBM T. J. Watson Research Center
 *http://researcher.ibm.com/person/us-wtan*
 http://researcher.ibm.com/person/us-wtan






 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io





Re: GroupByKey results in OOM - Any other alternative

2014-06-15 Thread Krishna Sankar
Ian,
   Yep, HLL is an appropriate mechanism. The countApproxDistinctByKey is a
wrapper around the
com.clearspring.analytics.stream.cardinality.HyperLogLogPlus.
Cheers
k/


On Sun, Jun 15, 2014 at 4:50 PM, Ian O'Connell i...@ianoconnell.com wrote:

 Depending on your requirements when doing hourly metrics calculating
 distinct cardinality, a much more scalable method would be to use a hyper
 log log data structure.
 a scala impl people have used with spark would be
 https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala


 On Sun, Jun 15, 2014 at 6:16 AM, Surendranauth Hiraman 
 suren.hira...@velos.io wrote:

 Vivek,

 If the foldByKey solution doesn't work for you, my team uses
 RDD.persist(DISK_ONLY) to avoid OOM errors.

 It's slower, of course, and requires tuning other config parameters. It
 can also be a problem if you do not have enough disk space, meaning that
 you have to unpersist at the right points if you are running long flows.

 For us, even though the disk writes are a performance hit, we prefer the
 Spark programming model to Hadoop M/R. But we are still working on getting
 this to work end to end on 100s of GB of data on our 16-node cluster.

 Suren



 On Sun, Jun 15, 2014 at 12:08 AM, Vivek YS vivek...@gmail.com wrote:

 Thanks for the input. I will give foldByKey a shot.

 The way I am doing is, data is partitioned hourly. So I am computing
 distinct values hourly. Then I use unionRDD to merge them and compute
 distinct on the overall data.

  Is there a way to know which key,value pair is resulting in the OOM ?
  Is there a way to set parallelism in the map stage so that, each
 worker will process one key at time. ?

 I didn't realise countApproxDistinctByKey is using hyperloglogplus.
 This should be interesting.

 --Vivek


 On Sat, Jun 14, 2014 at 11:37 PM, Sean Owen so...@cloudera.com wrote:

 Grouping by key is always problematic since a key might have a huge
 number of values. You can do a little better than grouping *all* values and
 *then* finding distinct values by using foldByKey, putting values into a
 Set. At least you end up with only distinct values in memory. (You don't
 need two maps either, right?)

 If the number of distinct values is still huge for some keys, consider
 the experimental method countApproxDistinctByKey:
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L285

 This should be much more performant at the cost of some accuracy.


 On Sat, Jun 14, 2014 at 1:58 PM, Vivek YS vivek...@gmail.com wrote:

 Hi,
For last couple of days I have been trying hard to get around this
 problem. Please share any insights on solving this problem.

 Problem :
 There is a huge list of (key, value) pairs. I want to transform this
 to (key, distinct values) and then eventually to (key, distinct values
 count)

 On small dataset

 groupByKey().map( x = (x_1, x._2.distinct)) ...map(x = (x_1,
 x._2.distinct.count))

 On large data set I am getting OOM.

 Is there a way to represent Seq of values from groupByKey as RDD and
 then perform distinct over it ?

 Thanks
 Vivek






 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io