Re: GroupByKey results in OOM - Any other alternative

2014-06-15 Thread Surendranauth Hiraman
Vivek,

If the foldByKey solution doesn't work for you, my team uses
RDD.persist(DISK_ONLY) to avoid OOM errors.

It's slower, of course, and requires tuning other config parameters. It can
also be a problem if you do not have enough disk space, meaning that you
have to unpersist at the right points if you are running long flows.

For us, even though the disk writes are a performance hit, we prefer the
Spark programming model to Hadoop M/R. But we are still working on getting
this to work end to end on 100s of GB of data on our 16-node cluster.

Suren



On Sun, Jun 15, 2014 at 12:08 AM, Vivek YS  wrote:

> Thanks for the input. I will give foldByKey a shot.
>
> The way I am doing is, data is partitioned hourly. So I am computing
> distinct values hourly. Then I use unionRDD to merge them and compute
> distinct on the overall data.
>
> > Is there a way to know which key,value pair is resulting in the OOM ?
> > Is there a way to set parallelism in the map stage so that, each worker
> will process one key at time. ?
>
> I didn't realise countApproxDistinctByKey is using hyperloglogplus. This
> should be interesting.
>
> --Vivek
>
>
> On Sat, Jun 14, 2014 at 11:37 PM, Sean Owen  wrote:
>
>> Grouping by key is always problematic since a key might have a huge
>> number of values. You can do a little better than grouping *all* values and
>> *then* finding distinct values by using foldByKey, putting values into a
>> Set. At least you end up with only distinct values in memory. (You don't
>> need two maps either, right?)
>>
>> If the number of distinct values is still huge for some keys, consider
>> the experimental method countApproxDistinctByKey:
>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L285
>>
>> This should be much more performant at the cost of some accuracy.
>>
>>
>> On Sat, Jun 14, 2014 at 1:58 PM, Vivek YS  wrote:
>>
>>> Hi,
>>>For last couple of days I have been trying hard to get around this
>>> problem. Please share any insights on solving this problem.
>>>
>>> Problem :
>>> There is a huge list of (key, value) pairs. I want to transform this to
>>> (key, distinct values) and then eventually to (key, distinct values count)
>>>
>>> On small dataset
>>>
>>> groupByKey().map( x => (x_1, x._2.distinct)) ...map(x => (x_1,
>>> x._2.distinct.count))
>>>
>>> On large data set I am getting OOM.
>>>
>>> Is there a way to represent Seq of values from groupByKey as RDD and
>>> then perform distinct over it ?
>>>
>>> Thanks
>>> Vivek
>>>
>>
>>
>


-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v elos.io
W: www.velos.io


Using custom class as a key for groupByKey() or reduceByKey()

2014-06-15 Thread Gaurav Jain
I have a simple Java class as follows, that I want to use as a key while
applying groupByKey or reduceByKey functions:

private static class FlowId {
public String dcxId;
public String trxId;
public String msgType;

public FlowId(String dcxId, String trxId, String msgType) {
this.dcxId = dcxId;
this.trxId = trxId;
this.msgType = msgType;
}

public boolean equals(Object other) {
if (other == this) return true;
if (other == null) return false;
if (getClass() != other.getClass()) return false;
FlowId fid = (FlowId) other;
if (this.dcxId.equals(fid.dcxId) && 
this.trxId.equals(fid.trxId) &&
this.msgType.equals(fid.msgType)) {
return true;
}
return false;
}
}

I figured that an equals() method would need to be overridden to ensure
comparison of keys, but still entries with the same key are listed
separately after applying a groupByKey(), for example. What further
modifications are necessary to enable usage of above class as a key. Right
now, I have fallen back to using Tuple3 instead of
the FlowId class, but it makes the code unnecessarily verbose.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-custom-class-as-a-key-for-groupByKey-or-reduceByKey-tp7640.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Using custom class as a key for groupByKey() or reduceByKey()

2014-06-15 Thread Sean Owen
In Java at large, you must always implement hashCode() when you implement
equals(). This is not specific to Spark. This is to maintain the contract
that two equals() instances have the same hash code, and that's not the
case for your class now. This causes weird things to happen wherever the
hash code contract is depended upon.

This probably works fine:

@Override
public int hashCode() {
  return dcxId.hashCode() ^ trxId.hashCode() ^ msgType.hashCode();
}


On Sun, Jun 15, 2014 at 11:45 AM, Gaurav Jain  wrote:

> I have a simple Java class as follows, that I want to use as a key while
> applying groupByKey or reduceByKey functions:
>
> private static class FlowId {
> public String dcxId;
> public String trxId;
> public String msgType;
>
> public FlowId(String dcxId, String trxId, String msgType) {
> this.dcxId = dcxId;
> this.trxId = trxId;
> this.msgType = msgType;
> }
>
> public boolean equals(Object other) {
> if (other == this) return true;
> if (other == null) return false;
> if (getClass() != other.getClass()) return false;
> FlowId fid = (FlowId) other;
> if (this.dcxId.equals(fid.dcxId) &&
> this.trxId.equals(fid.trxId) &&
> this.msgType.equals(fid.msgType)) {
> return true;
> }
> return false;
> }
> }
>
> I figured that an equals() method would need to be overridden to ensure
> comparison of keys, but still entries with the same key are listed
> separately after applying a groupByKey(), for example. What further
> modifications are necessary to enable usage of above class as a key. Right
> now, I have fallen back to using Tuple3 instead of
> the FlowId class, but it makes the code unnecessarily verbose.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-custom-class-as-a-key-for-groupByKey-or-reduceByKey-tp7640.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: long GC pause during file.cache()

2014-06-15 Thread Hao Wang
Hi, Wei

You may try to set JVM opts in *spark-env.sh* as follow to prevent or
mitigate GC pause:

export SPARK_JAVA_OPTS="-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC
-Xmx2g -XX:MaxPermSize=256m"

There are more options you could add, please just Google :)


Regards,
Wang Hao(王灏)

CloudTeam | School of Software Engineering
Shanghai Jiao Tong University
Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
Email:wh.s...@gmail.com


On Sun, Jun 15, 2014 at 10:24 AM, Wei Tan  wrote:

> Hi,
>
>   I have a single node (192G RAM) stand-alone spark, with memory
> configuration like this in spark-env.sh
>
> SPARK_WORKER_MEMORY=180g
> SPARK_MEM=180g
>
>
>  In spark-shell I have a program like this:
>
> val file = sc.textFile("/localpath") //file size is 40G
> file.cache()
>
>
> val output = file.map(line => extract something from line)
>
> output.saveAsTextFile (...)
>
>
> When I run this program again and again, or keep trying file.unpersist()
> --> file.cache() --> output.saveAsTextFile(), the run time varies a lot,
> from 1 min to 3 min to 50+ min. Whenever the run-time is more than 1 min,
> from the stage monitoring GUI I observe big GC pause (some can be 10+ min).
> Of course when run-time is "normal", say ~1 min, no significant GC is
> observed. The behavior seems somewhat random.
>
> Is there any JVM tuning I should do to prevent this long GC pause from
> happening?
>
>
>
> I used java-1.6.0-openjdk.x86_64, and my spark-shell process is something
> like this:
>
> root 10994  1.7  0.6 196378000 1361496 pts/51 Sl+ 22:06   0:12
> /usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp
> ::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar
> -XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g
> org.apache.spark.deploy.SparkSubmit spark-shell --class
> org.apache.spark.repl.Main
>
> Best regards,
> Wei
>
> -
> Wei Tan, PhD
> Research Staff Member
> IBM T. J. Watson Research Center
> *http://researcher.ibm.com/person/us-wtan*
> 


Re: Using custom class as a key for groupByKey() or reduceByKey()

2014-06-15 Thread Andrew Ash
Good point Sean.  I've filed a ticket to document the equals() / hashCode()
requirements for custom keys in the Spark documentation, as this has come
up a few times on the user@ list.

https://issues.apache.org/jira/browse/SPARK-2148


On Sun, Jun 15, 2014 at 12:11 PM, Sean Owen  wrote:

> In Java at large, you must always implement hashCode() when you implement
> equals(). This is not specific to Spark. This is to maintain the contract
> that two equals() instances have the same hash code, and that's not the
> case for your class now. This causes weird things to happen wherever the
> hash code contract is depended upon.
>
> This probably works fine:
>
> @Override
> public int hashCode() {
>   return dcxId.hashCode() ^ trxId.hashCode() ^ msgType.hashCode();
> }
>
>
> On Sun, Jun 15, 2014 at 11:45 AM, Gaurav Jain 
> wrote:
>
>> I have a simple Java class as follows, that I want to use as a key while
>> applying groupByKey or reduceByKey functions:
>>
>> private static class FlowId {
>> public String dcxId;
>> public String trxId;
>> public String msgType;
>>
>> public FlowId(String dcxId, String trxId, String msgType)
>> {
>> this.dcxId = dcxId;
>> this.trxId = trxId;
>> this.msgType = msgType;
>> }
>>
>> public boolean equals(Object other) {
>> if (other == this) return true;
>> if (other == null) return false;
>> if (getClass() != other.getClass()) return false;
>> FlowId fid = (FlowId) other;
>> if (this.dcxId.equals(fid.dcxId) &&
>> this.trxId.equals(fid.trxId) &&
>> this.msgType.equals(fid.msgType))
>> {
>> return true;
>> }
>> return false;
>> }
>> }
>>
>> I figured that an equals() method would need to be overridden to ensure
>> comparison of keys, but still entries with the same key are listed
>> separately after applying a groupByKey(), for example. What further
>> modifications are necessary to enable usage of above class as a key. Right
>> now, I have fallen back to using Tuple3 instead of
>> the FlowId class, but it makes the code unnecessarily verbose.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Using-custom-class-as-a-key-for-groupByKey-or-reduceByKey-tp7640.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>


Re: long GC pause during file.cache()

2014-06-15 Thread Nan Zhu
SPARK_JAVA_OPTS is deprecated in 1.0, though it works fine if you don’t mind 
the WARNING in the logs

you can set spark.executor.extraJavaOpts in your SparkConf obj  

Best,

--  
Nan Zhu


On Sunday, June 15, 2014 at 12:13 PM, Hao Wang wrote:

> Hi, Wei
>  
> You may try to set JVM opts in spark-env.sh (http://spark-env.sh) as follow 
> to prevent or mitigate GC pause:
>  
> export SPARK_JAVA_OPTS="-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC 
> -Xmx2g -XX:MaxPermSize=256m"
>  
> There are more options you could add, please just Google :)  
>  
>  
> Regards,
> Wang Hao(王灏)
>  
> CloudTeam | School of Software Engineering
> Shanghai Jiao Tong University
> Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
> Email:wh.s...@gmail.com (mailto:wh.s...@gmail.com)
>  
>  
>  
>  
>  
>  
> On Sun, Jun 15, 2014 at 10:24 AM, Wei Tan  (mailto:w...@us.ibm.com)> wrote:
> > Hi,  
> >  
> >   I have a single node (192G RAM) stand-alone spark, with memory 
> > configuration like this in spark-env.sh (http://spark-env.sh)  
> >  
> > SPARK_WORKER_MEMORY=180g  
> > SPARK_MEM=180g  
> >  
> >  
> >  In spark-shell I have a program like this:  
> >  
> > val file = sc.textFile("/localpath") //file size is 40G  
> > file.cache()  
> >  
> >  
> > val output = file.map(line => extract something from line)  
> >  
> > output.saveAsTextFile (...)  
> >  
> >  
> > When I run this program again and again, or keep trying file.unpersist() 
> > --> file.cache() --> output.saveAsTextFile(), the run time varies a lot, 
> > from 1 min to 3 min to 50+ min. Whenever the run-time is more than 1 min, 
> > from the stage monitoring GUI I observe big GC pause (some can be 10+ min). 
> > Of course when run-time is "normal", say ~1 min, no significant GC is 
> > observed. The behavior seems somewhat random.  
> >  
> > Is there any JVM tuning I should do to prevent this long GC pause from 
> > happening?  
> >  
> >  
> >  
> > I used java-1.6.0-openjdk.x86_64, and my spark-shell process is something 
> > like this:  
> >  
> > root 10994  1.7  0.6 196378000 1361496 pts/51 Sl+ 22:06   0:12 
> > /usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp 
> > ::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar
> >  -XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g 
> > org.apache.spark.deploy.SparkSubmit spark-shell --class 
> > org.apache.spark.repl.Main  
> >  
> > Best regards,  
> > Wei  
> >  
> > -  
> > Wei Tan, PhD  
> > Research Staff Member  
> > IBM T. J. Watson Research Center  
> > http://researcher.ibm.com/person/us-wtan



Re: MLLib : Decision Tree not getting built for 5 or more levels(maxDepth=5) and the one built for 3 levels is performing poorly

2014-06-15 Thread Manish Amde
Hi Suraj,

I don't see any logs from mllib. You might need to explicit set the logging
to DEBUG for mllib. Adding this line for log4j.properties might fix the
problem.
log4j.logger.org.apache.spark.mllib.tree=DEBUG

Also, please let me know if you can encounter similar problems with the
Spark master.

-Manish


On Sat, Jun 14, 2014 at 3:19 AM, SURAJ SHETH  wrote:

> Hi Manish,
> Thanks for your reply.
>
> I am attaching the logs here(regression, 5 levels). It contains the last
> 100s of lines. Also, I am attaching the screenshot of Spark UI. The first 4
> levels complete in less than 6 seconds, while the 5th level doesn't
> complete even after several hours.
> Due to the reason that this is somebody else's data, I can't share it.
>
> Can you check the code snippet attached in my first email and see if it
> needs something to enable it to work for large data and >= 5 levels. It is
> working for 3 levels on the same dataset, but, not for 5 levels.
>
> In the mean time, I will try to run it on the latest master and let you
> know the results. If it runs fine there, then, it can be related to 128 MB
> limit issue that you mentioned.
>
> Thanks and Regards,
> Suraj Sheth
>
>
>
> On Sat, Jun 14, 2014 at 12:05 AM, Manish Amde  wrote:
>
>> Hi Suraj,
>>
>> I can't answer 1) without knowing the data. However, the results for 2)
>> are surprising indeed. We have tested with a billion samples for regression
>> tasks so I am perplexed with the behavior.
>>
>> Could you try the latest Spark master to see whether this problem goes
>> away. It has code that limits memory consumption at the master and worker
>> nodes to 128 MB by default which ideally should not be needed given the
>> amount of RAM on your cluster.
>>
>> Also, feel free to send the DEBUG logs. It might give me a better idea of
>> where the algorithm is getting stuck.
>>
>> -Manish
>>
>>
>>
>> On Wed, Jun 11, 2014 at 1:20 PM, SURAJ SHETH  wrote:
>>
>>> Hi Filipus,
>>> The train data is already oversampled.
>>> The number of positives I mentioned above is for the test dataset :
>>> 12028 (apologies for not making this clear earlier)
>>> The train dataset has 61,264 positives out of 689,763 total rows. The
>>> number of negatives is 628,499.
>>> Oversampling was done for the train dataset to ensure that we have
>>> atleast 9-10% of positives in the train part
>>> No oversampling is done for the test dataset.
>>>
>>> So, the only difference that remains is the amount of data used for
>>> building a tree.
>>>
>>> But, I have a few more questions :
>>> Have we tried how much data can be used at most to build a single
>>> Decision Tree.
>>> Since, I have enough RAM to fit all the data into memory(only 1.3 GB of
>>> train data and 30x3 GB of RAM), I would expect it to build a single
>>> Decision Tree with all the data without any issues. But, for maxDepth >= 5,
>>> it is not able to. I confirmed that when it keeps running for hours, the
>>> amount of free memory available is more than 70%. So, it doesn't seem to be
>>> a Memory issue either.
>>>
>>>
>>> Thanks and Regards,
>>> Suraj Sheth
>>>
>>>
>>> On Wed, Jun 11, 2014 at 10:19 PM, filipus  wrote:
>>>
 well I guess your problem is quite unbalanced and due to the information
 value as a splitting criterion I guess the algo stops after very view
 splits

 work arround is oversampling

 build many training datasets like

 take randomly 50% of the positives and from the negativ the same amount
 or
 let say the double

 => 6000 positives and 12000 negatives

 build a tree

 this you do many times => many models (agents)

 and than you make an ensemble model. means vote all the model

 in a way similar two random forest but at the completely different



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-Decision-Tree-not-getting-built-for-5-or-more-levels-maxDepth-5-and-the-one-built-for-3-levelsy-tp7401p7405.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

>>>
>>>
>>
>


Akka listens to hostname while user may spark-submit with master in IP url

2014-06-15 Thread Hao Wang
Hi, All

In Spark the "spark.driver.host" is driver hostname in default, thus, akka
actor system will listen to a URL like akka.tcp://hostname:port. However,
when a user tries to use spark-submit to run application, the user may set
"--master spark://192.168.1.12:7077".

Then, the *AppClient* in *SparkDeploySchedulerBackend* cannot successfully
register to the Master, and the console prints:

"WARN TaskSchedulerImpl: Initial job has not accepted any resources; check
your cluster UI to ensure that workers are registered and have sufficient
memory"

I think we need to improve this by making akka recognises both hostname and
the corresponding IP. Or at least add lines in Spark document to limit user
from using IP. Any comments?

Regards,
Wang Hao(王灏)

CloudTeam | School of Software Engineering
Shanghai Jiao Tong University
Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
Email:wh.s...@gmail.com


Re: long GC pause during file.cache()

2014-06-15 Thread Surendranauth Hiraman
Is SPARK_DAEMON_JAVA_OPTS valid in 1.0.0?



On Sun, Jun 15, 2014 at 4:59 PM, Nan Zhu  wrote:

>  SPARK_JAVA_OPTS is deprecated in 1.0, though it works fine if you
> don’t mind the WARNING in the logs
>
> you can set spark.executor.extraJavaOpts in your SparkConf obj
>
> Best,
>
> --
> Nan Zhu
>
> On Sunday, June 15, 2014 at 12:13 PM, Hao Wang wrote:
>
> Hi, Wei
>
> You may try to set JVM opts in *spark-env.sh * as
> follow to prevent or mitigate GC pause:
>
> export SPARK_JAVA_OPTS="-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC
> -Xmx2g -XX:MaxPermSize=256m"
>
> There are more options you could add, please just Google :)
>
>
> Regards,
> Wang Hao(王灏)
>
> CloudTeam | School of Software Engineering
> Shanghai Jiao Tong University
> Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
> Email:wh.s...@gmail.com
>
>
> On Sun, Jun 15, 2014 at 10:24 AM, Wei Tan  wrote:
>
> Hi,
>
>   I have a single node (192G RAM) stand-alone spark, with memory
> configuration like this in spark-env.sh
>
> SPARK_WORKER_MEMORY=180g
> SPARK_MEM=180g
>
>
>  In spark-shell I have a program like this:
>
> val file = sc.textFile("/localpath") //file size is 40G
> file.cache()
>
>
> val output = file.map(line => extract something from line)
>
> output.saveAsTextFile (...)
>
>
> When I run this program again and again, or keep trying file.unpersist()
> --> file.cache() --> output.saveAsTextFile(), the run time varies a lot,
> from 1 min to 3 min to 50+ min. Whenever the run-time is more than 1 min,
> from the stage monitoring GUI I observe big GC pause (some can be 10+ min).
> Of course when run-time is "normal", say ~1 min, no significant GC is
> observed. The behavior seems somewhat random.
>
> Is there any JVM tuning I should do to prevent this long GC pause from
> happening?
>
>
>
> I used java-1.6.0-openjdk.x86_64, and my spark-shell process is something
> like this:
>
> root 10994  1.7  0.6 196378000 1361496 pts/51 Sl+ 22:06   0:12
> /usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp
> ::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar
> -XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g
> org.apache.spark.deploy.SparkSubmit spark-shell --class
> org.apache.spark.repl.Main
>
> Best regards,
> Wei
>
> -
> Wei Tan, PhD
> Research Staff Member
> IBM T. J. Watson Research Center
> *http://researcher.ibm.com/person/us-wtan*
> 
>
>
>
>


-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v elos.io
W: www.velos.io


Re: long GC pause during file.cache()

2014-06-15 Thread Nan Zhu
Yes, I think in the spark-env.sh.template, it is listed in the comments (didn’t 
check….)  

Best,  

--  
Nan Zhu


On Sunday, June 15, 2014 at 5:21 PM, Surendranauth Hiraman wrote:

> Is SPARK_DAEMON_JAVA_OPTS valid in 1.0.0?
>  
>  
>  
> On Sun, Jun 15, 2014 at 4:59 PM, Nan Zhu  (mailto:zhunanmcg...@gmail.com)> wrote:
> > SPARK_JAVA_OPTS is deprecated in 1.0, though it works fine if you don’t 
> > mind the WARNING in the logs
> >  
> > you can set spark.executor.extraJavaOpts in your SparkConf obj  
> >  
> > Best,
> >  
> > --  
> > Nan Zhu
> >  
> >  
> > On Sunday, June 15, 2014 at 12:13 PM, Hao Wang wrote:
> >  
> > > Hi, Wei
> > >  
> > > You may try to set JVM opts in spark-env.sh (http://spark-env.sh) as 
> > > follow to prevent or mitigate GC pause:  
> > >  
> > > export SPARK_JAVA_OPTS="-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC 
> > > -Xmx2g -XX:MaxPermSize=256m"
> > >  
> > > There are more options you could add, please just Google :)  
> > >  
> > >  
> > > Regards,
> > > Wang Hao(王灏)
> > >  
> > > CloudTeam | School of Software Engineering
> > > Shanghai Jiao Tong University
> > > Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
> > > Email:wh.s...@gmail.com (mailto:wh.s...@gmail.com)
> > >  
> > >  
> > >  
> > >  
> > >  
> > >  
> > > On Sun, Jun 15, 2014 at 10:24 AM, Wei Tan  > > (mailto:w...@us.ibm.com)> wrote:
> > > > Hi,  
> > > >  
> > > >   I have a single node (192G RAM) stand-alone spark, with memory 
> > > > configuration like this in spark-env.sh (http://spark-env.sh)  
> > > >  
> > > > SPARK_WORKER_MEMORY=180g  
> > > > SPARK_MEM=180g  
> > > >  
> > > >  
> > > >  In spark-shell I have a program like this:  
> > > >  
> > > > val file = sc.textFile("/localpath") //file size is 40G  
> > > > file.cache()  
> > > >  
> > > >  
> > > > val output = file.map(line => extract something from line)  
> > > >  
> > > > output.saveAsTextFile (...)  
> > > >  
> > > >  
> > > > When I run this program again and again, or keep trying 
> > > > file.unpersist() --> file.cache() --> output.saveAsTextFile(), the run 
> > > > time varies a lot, from 1 min to 3 min to 50+ min. Whenever the 
> > > > run-time is more than 1 min, from the stage monitoring GUI I observe 
> > > > big GC pause (some can be 10+ min). Of course when run-time is 
> > > > "normal", say ~1 min, no significant GC is observed. The behavior seems 
> > > > somewhat random.  
> > > >  
> > > > Is there any JVM tuning I should do to prevent this long GC pause from 
> > > > happening?  
> > > >  
> > > >  
> > > >  
> > > > I used java-1.6.0-openjdk.x86_64, and my spark-shell process is 
> > > > something like this:  
> > > >  
> > > > root 10994  1.7  0.6 196378000 1361496 pts/51 Sl+ 22:06   0:12 
> > > > /usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp 
> > > > ::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar
> > > >  -XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g 
> > > > org.apache.spark.deploy.SparkSubmit spark-shell --class 
> > > > org.apache.spark.repl.Main  
> > > >  
> > > > Best regards,  
> > > > Wei  
> > > >  
> > > > -  
> > > > Wei Tan, PhD  
> > > > Research Staff Member  
> > > > IBM T. J. Watson Research Center  
> > > > http://researcher.ibm.com/person/us-wtan
> >  
>  
>  
>  
> --  
>  SUREN HIRAMAN, 
> VP TECHNOLOGY
> Velos
> Accelerating Machine Learning
>  
> 440 NINTH AVENUE, 11TH FLOOR
> NEW YORK, NY 10001
> O: (917) 525-2466 ext. 105
> F: 646.349.4063
> E: suren.hiraman@v (mailto:suren.hira...@sociocast.com)elos.io 
> (http://elos.io)
> W: www.velos.io (http://www.velos.io/)
>  



pyspark serializer can't handle functions?

2014-06-15 Thread madeleine
It seems that the default serializer used by pyspark can't serialize a list
of functions.
I've seen some posts about trying to fix this by using dill to serialize
rather than pickle. 
Does anyone know what the status of that project is, or whether there's
another easy workaround?

I've pasted a sample error message below. Here, regs is a function defined
in another file myfile.py that has been included on all workers via the
pyFiles argument to SparkContext: sc = SparkContext("local",
"myapp",pyFiles=["myfile.py"]).

  File "runfile.py", line 45, in __init__
regsRDD = sc.parallelize([regs]*self.n)
  File "/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/context.py",
line 223, in parallelize
serializer.dump_stream(c, tempFile)
  File
"/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py", line
182, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
  File
"/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py", line
118, in dump_stream
self._write_with_length(obj, stream)
  File
"/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py", line
128, in _write_with_length
serialized = self.dumps(obj)
  File
"/Applications/spark-0.9.1-bin-hadoop2/python/pyspark/serializers.py", line
270, in dumps
def dumps(self, obj): return cPickle.dumps(obj, 2)
cPickle.PicklingError: Can't pickle : attribute lookup
__builtin__.function failed



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-serializer-can-t-handle-functions-tp7650.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: GroupByKey results in OOM - Any other alternative

2014-06-15 Thread Ian O'Connell
Depending on your requirements when doing hourly metrics calculating
distinct cardinality, a much more scalable method would be to use a hyper
log log data structure.
a scala impl people have used with spark would be
https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala


On Sun, Jun 15, 2014 at 6:16 AM, Surendranauth Hiraman <
suren.hira...@velos.io> wrote:

> Vivek,
>
> If the foldByKey solution doesn't work for you, my team uses
> RDD.persist(DISK_ONLY) to avoid OOM errors.
>
> It's slower, of course, and requires tuning other config parameters. It
> can also be a problem if you do not have enough disk space, meaning that
> you have to unpersist at the right points if you are running long flows.
>
> For us, even though the disk writes are a performance hit, we prefer the
> Spark programming model to Hadoop M/R. But we are still working on getting
> this to work end to end on 100s of GB of data on our 16-node cluster.
>
> Suren
>
>
>
> On Sun, Jun 15, 2014 at 12:08 AM, Vivek YS  wrote:
>
>> Thanks for the input. I will give foldByKey a shot.
>>
>> The way I am doing is, data is partitioned hourly. So I am computing
>> distinct values hourly. Then I use unionRDD to merge them and compute
>> distinct on the overall data.
>>
>> > Is there a way to know which key,value pair is resulting in the OOM ?
>> > Is there a way to set parallelism in the map stage so that, each worker
>> will process one key at time. ?
>>
>> I didn't realise countApproxDistinctByKey is using hyperloglogplus. This
>> should be interesting.
>>
>> --Vivek
>>
>>
>> On Sat, Jun 14, 2014 at 11:37 PM, Sean Owen  wrote:
>>
>>> Grouping by key is always problematic since a key might have a huge
>>> number of values. You can do a little better than grouping *all* values and
>>> *then* finding distinct values by using foldByKey, putting values into a
>>> Set. At least you end up with only distinct values in memory. (You don't
>>> need two maps either, right?)
>>>
>>> If the number of distinct values is still huge for some keys, consider
>>> the experimental method countApproxDistinctByKey:
>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L285
>>>
>>> This should be much more performant at the cost of some accuracy.
>>>
>>>
>>> On Sat, Jun 14, 2014 at 1:58 PM, Vivek YS  wrote:
>>>
 Hi,
For last couple of days I have been trying hard to get around this
 problem. Please share any insights on solving this problem.

 Problem :
 There is a huge list of (key, value) pairs. I want to transform this to
 (key, distinct values) and then eventually to (key, distinct values count)

 On small dataset

 groupByKey().map( x => (x_1, x._2.distinct)) ...map(x => (x_1,
 x._2.distinct.count))

 On large data set I am getting OOM.

 Is there a way to represent Seq of values from groupByKey as RDD and
 then perform distinct over it ?

 Thanks
 Vivek

>>>
>>>
>>
>
>
> --
>
> SUREN HIRAMAN, VP TECHNOLOGY
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR
> NEW YORK, NY 10001
> O: (917) 525-2466 ext. 105
> F: 646.349.4063
> E: suren.hiraman@v elos.io
> W: www.velos.io
>
>


Re: long GC pause during file.cache()

2014-06-15 Thread Aaron Davidson
Note also that Java does not work well with very large JVMs due to this
exact issue. There are two commonly used workarounds:

1) Spawn multiple (smaller) executors on the same machine. This can be done
by creating multiple Workers (via SPARK_WORKER_INSTANCES in standalone
mode[1]).
2) Use Tachyon for off-heap caching of RDDs, allowing Spark executors to be
smaller and avoid GC pauses

[1] See standalone documentation here:
http://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts


On Sun, Jun 15, 2014 at 3:50 PM, Nan Zhu  wrote:

>  Yes, I think in the spark-env.sh.template, it is listed in the comments
> (didn’t check….)
>
> Best,
>
> --
> Nan Zhu
>
> On Sunday, June 15, 2014 at 5:21 PM, Surendranauth Hiraman wrote:
>
> Is SPARK_DAEMON_JAVA_OPTS valid in 1.0.0?
>
>
>
> On Sun, Jun 15, 2014 at 4:59 PM, Nan Zhu  wrote:
>
>  SPARK_JAVA_OPTS is deprecated in 1.0, though it works fine if you
> don’t mind the WARNING in the logs
>
> you can set spark.executor.extraJavaOpts in your SparkConf obj
>
> Best,
>
> --
> Nan Zhu
>
> On Sunday, June 15, 2014 at 12:13 PM, Hao Wang wrote:
>
> Hi, Wei
>
> You may try to set JVM opts in *spark-env.sh * as
> follow to prevent or mitigate GC pause:
>
> export SPARK_JAVA_OPTS="-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC
> -Xmx2g -XX:MaxPermSize=256m"
>
> There are more options you could add, please just Google :)
>
>
> Regards,
> Wang Hao(王灏)
>
> CloudTeam | School of Software Engineering
> Shanghai Jiao Tong University
> Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
> Email:wh.s...@gmail.com
>
>
> On Sun, Jun 15, 2014 at 10:24 AM, Wei Tan  wrote:
>
> Hi,
>
>   I have a single node (192G RAM) stand-alone spark, with memory
> configuration like this in spark-env.sh
>
> SPARK_WORKER_MEMORY=180g
> SPARK_MEM=180g
>
>
>  In spark-shell I have a program like this:
>
> val file = sc.textFile("/localpath") //file size is 40G
> file.cache()
>
>
> val output = file.map(line => extract something from line)
>
> output.saveAsTextFile (...)
>
>
> When I run this program again and again, or keep trying file.unpersist()
> --> file.cache() --> output.saveAsTextFile(), the run time varies a lot,
> from 1 min to 3 min to 50+ min. Whenever the run-time is more than 1 min,
> from the stage monitoring GUI I observe big GC pause (some can be 10+ min).
> Of course when run-time is "normal", say ~1 min, no significant GC is
> observed. The behavior seems somewhat random.
>
> Is there any JVM tuning I should do to prevent this long GC pause from
> happening?
>
>
>
> I used java-1.6.0-openjdk.x86_64, and my spark-shell process is something
> like this:
>
> root 10994  1.7  0.6 196378000 1361496 pts/51 Sl+ 22:06   0:12
> /usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp
> ::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar
> -XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g
> org.apache.spark.deploy.SparkSubmit spark-shell --class
> org.apache.spark.repl.Main
>
> Best regards,
> Wei
>
> -
> Wei Tan, PhD
> Research Staff Member
> IBM T. J. Watson Research Center
> *http://researcher.ibm.com/person/us-wtan*
> 
>
>
>
>
>
>
> --
>
> SUREN HIRAMAN, VP TECHNOLOGY
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR
> NEW YORK, NY 10001
> O: (917) 525-2466 ext. 105
> F: 646.349.4063
> E: suren.hiraman@v elos.io
> W: www.velos.io
>
>
>


Re: GroupByKey results in OOM - Any other alternative

2014-06-15 Thread Krishna Sankar
Ian,
   Yep, HLL is an appropriate mechanism. The countApproxDistinctByKey is a
wrapper around the
com.clearspring.analytics.stream.cardinality.HyperLogLogPlus.
Cheers



On Sun, Jun 15, 2014 at 4:50 PM, Ian O'Connell  wrote:

> Depending on your requirements when doing hourly metrics calculating
> distinct cardinality, a much more scalable method would be to use a hyper
> log log data structure.
> a scala impl people have used with spark would be
> https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala
>
>
> On Sun, Jun 15, 2014 at 6:16 AM, Surendranauth Hiraman <
> suren.hira...@velos.io> wrote:
>
>> Vivek,
>>
>> If the foldByKey solution doesn't work for you, my team uses
>> RDD.persist(DISK_ONLY) to avoid OOM errors.
>>
>> It's slower, of course, and requires tuning other config parameters. It
>> can also be a problem if you do not have enough disk space, meaning that
>> you have to unpersist at the right points if you are running long flows.
>>
>> For us, even though the disk writes are a performance hit, we prefer the
>> Spark programming model to Hadoop M/R. But we are still working on getting
>> this to work end to end on 100s of GB of data on our 16-node cluster.
>>
>> Suren
>>
>>
>>
>> On Sun, Jun 15, 2014 at 12:08 AM, Vivek YS  wrote:
>>
>>> Thanks for the input. I will give foldByKey a shot.
>>>
>>> The way I am doing is, data is partitioned hourly. So I am computing
>>> distinct values hourly. Then I use unionRDD to merge them and compute
>>> distinct on the overall data.
>>>
>>> > Is there a way to know which key,value pair is resulting in the OOM ?
>>> > Is there a way to set parallelism in the map stage so that, each
>>> worker will process one key at time. ?
>>>
>>> I didn't realise countApproxDistinctByKey is using hyperloglogplus.
>>> This should be interesting.
>>>
>>> --Vivek
>>>
>>>
>>> On Sat, Jun 14, 2014 at 11:37 PM, Sean Owen  wrote:
>>>
 Grouping by key is always problematic since a key might have a huge
 number of values. You can do a little better than grouping *all* values and
 *then* finding distinct values by using foldByKey, putting values into a
 Set. At least you end up with only distinct values in memory. (You don't
 need two maps either, right?)

 If the number of distinct values is still huge for some keys, consider
 the experimental method countApproxDistinctByKey:
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L285

 This should be much more performant at the cost of some accuracy.


 On Sat, Jun 14, 2014 at 1:58 PM, Vivek YS  wrote:

> Hi,
>For last couple of days I have been trying hard to get around this
> problem. Please share any insights on solving this problem.
>
> Problem :
> There is a huge list of (key, value) pairs. I want to transform this
> to (key, distinct values) and then eventually to (key, distinct values
> count)
>
> On small dataset
>
> groupByKey().map( x => (x_1, x._2.distinct)) ...map(x => (x_1,
> x._2.distinct.count))
>
> On large data set I am getting OOM.
>
> Is there a way to represent Seq of values from groupByKey as RDD and
> then perform distinct over it ?
>
> Thanks
> Vivek
>


>>>
>>
>>
>> --
>>
>> SUREN HIRAMAN, VP TECHNOLOGY
>> Velos
>> Accelerating Machine Learning
>>
>> 440 NINTH AVENUE, 11TH FLOOR
>> NEW YORK, NY 10001
>> O: (917) 525-2466 ext. 105
>> F: 646.349.4063
>> E: suren.hiraman@v elos.io
>> W: www.velos.io
>>
>>
>


Re: GroupByKey results in OOM - Any other alternative

2014-06-15 Thread Vivek YS
The more fundamental question is why doesn't groupByKey return RDD[(K,
RDD[V])] instead of RDD[(K, Iterable[V])].

I wrote something like this (Yet to test. & I am not sure if this is even
correct) I appreciate any suggestions/comments

  def groupByKeyWithRDD(partitioner: Partitioner): RDD[(K, RDD[V])] = {
def createCombiner(v: V) = self.context.parallelize(Array(v))
def mergeValue(buf: RDD[V], v: V) = buf ++
self.context.parallelize(Array(v))
def mergeCombiners(c1: RDD[V], c2: RDD[V]) = c1 ++ c2
val bufs = combineByKey[RDD[V]](
  createCombiner _, mergeValue _, mergeCombiners _, partitioner,
mapSideCombine=false)
bufs
  }

--Vivek


On Mon, Jun 16, 2014 at 6:37 AM, Krishna Sankar  wrote:

> Ian,
>Yep, HLL is an appropriate mechanism. The countApproxDistinctByKey is
> a wrapper around the
> com.clearspring.analytics.stream.cardinality.HyperLogLogPlus.
> Cheers
> 
>
>
> On Sun, Jun 15, 2014 at 4:50 PM, Ian O'Connell 
> wrote:
>
>> Depending on your requirements when doing hourly metrics calculating
>> distinct cardinality, a much more scalable method would be to use a hyper
>> log log data structure.
>> a scala impl people have used with spark would be
>> https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala
>>
>>
>> On Sun, Jun 15, 2014 at 6:16 AM, Surendranauth Hiraman <
>> suren.hira...@velos.io> wrote:
>>
>>> Vivek,
>>>
>>> If the foldByKey solution doesn't work for you, my team uses
>>> RDD.persist(DISK_ONLY) to avoid OOM errors.
>>>
>>> It's slower, of course, and requires tuning other config parameters. It
>>> can also be a problem if you do not have enough disk space, meaning that
>>> you have to unpersist at the right points if you are running long flows.
>>>
>>> For us, even though the disk writes are a performance hit, we prefer the
>>> Spark programming model to Hadoop M/R. But we are still working on getting
>>> this to work end to end on 100s of GB of data on our 16-node cluster.
>>>
>>> Suren
>>>
>>>
>>>
>>> On Sun, Jun 15, 2014 at 12:08 AM, Vivek YS  wrote:
>>>
 Thanks for the input. I will give foldByKey a shot.

 The way I am doing is, data is partitioned hourly. So I am computing
 distinct values hourly. Then I use unionRDD to merge them and compute
 distinct on the overall data.

 > Is there a way to know which key,value pair is resulting in the OOM ?
 > Is there a way to set parallelism in the map stage so that, each
 worker will process one key at time. ?

 I didn't realise countApproxDistinctByKey is using hyperloglogplus.
 This should be interesting.

 --Vivek


 On Sat, Jun 14, 2014 at 11:37 PM, Sean Owen  wrote:

> Grouping by key is always problematic since a key might have a huge
> number of values. You can do a little better than grouping *all* values 
> and
> *then* finding distinct values by using foldByKey, putting values into a
> Set. At least you end up with only distinct values in memory. (You don't
> need two maps either, right?)
>
> If the number of distinct values is still huge for some keys, consider
> the experimental method countApproxDistinctByKey:
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L285
>
> This should be much more performant at the cost of some accuracy.
>
>
> On Sat, Jun 14, 2014 at 1:58 PM, Vivek YS  wrote:
>
>> Hi,
>>For last couple of days I have been trying hard to get around this
>> problem. Please share any insights on solving this problem.
>>
>> Problem :
>> There is a huge list of (key, value) pairs. I want to transform this
>> to (key, distinct values) and then eventually to (key, distinct values
>> count)
>>
>> On small dataset
>>
>> groupByKey().map( x => (x_1, x._2.distinct)) ...map(x => (x_1,
>> x._2.distinct.count))
>>
>> On large data set I am getting OOM.
>>
>> Is there a way to represent Seq of values from groupByKey as RDD and
>> then perform distinct over it ?
>>
>> Thanks
>> Vivek
>>
>
>

>>>
>>>
>>> --
>>>
>>> SUREN HIRAMAN, VP TECHNOLOGY
>>> Velos
>>> Accelerating Machine Learning
>>>
>>> 440 NINTH AVENUE, 11TH FLOOR
>>> NEW YORK, NY 10001
>>> O: (917) 525-2466 ext. 105
>>> F: 646.349.4063
>>> E: suren.hiraman@v elos.io
>>> W: www.velos.io
>>>
>>>
>>
>


Re: Kafka client - specify offsets?

2014-06-15 Thread Tobias Pfeiffer
Hi,

there are apparently helpers to tell you the offsets
,
but I have no idea how to pass that to the Kafka stream consumer. I am
interested in that as well.

Tobias

On Thu, Jun 12, 2014 at 5:53 AM, Michael Campbell
 wrote:
> Is there a way in the Apache Spark Kafka Utils to specify an offset to start
> reading?  Specifically, from the start of the queue, or failing that, a
> specific point?