Re: Slower performance when bigger memory?

2015-04-24 Thread Shawn Zheng
this is not about gc issue itself. The memory is

On Friday, April 24, 2015, Evo Eftimov  wrote:

> You can resort to Serialized storage (still in memory) of your RDDs – this
> will obviate the need for GC since the RDD elements are stored as
> serialized objects off the JVM heap (most likely in Tachion which is
> distributed in memory files system used by Spark internally)
>
>
>
> Also review the Object Oriented Model of your RDD to see whether it
> consists of too many redundant objects and multiple levels of hierarchy –
> in high performance computing and distributed cluster object oriented
> frameworks like Spark some of the “OO Patterns”  represent unnecessary
> burden ….
>
>
>
> *From:* Shuai Zheng [mailto:szheng.c...@gmail.com
> ]
> *Sent:* Thursday, April 23, 2015 6:14 PM
> *To:* user@spark.apache.org
> 
> *Subject:* Slower performance when bigger memory?
>
>
>
> Hi All,
>
>
>
> I am running some benchmark on r3*8xlarge instance. I have a cluster with
> one master (no executor on it) and one slave (r3*8xlarge).
>
>
>
> My job has 1000 tasks in stage 0.
>
>
>
> R3*8xlarge has 244G memory and 32 cores.
>
>
>
> If I create 4 executors, each has 8 core+50G memory, each task will take
> around 320s-380s. And if I only use one big executor with 32 cores and 200G
> memory, each task will take 760s-900s.
>
>
>
> And I check the log, looks like the minor GC takes much longer when using
> 200G memory:
>
>
>
> 285.242: [GC [PSYoungGen: 29027310K->8646087K(31119872K)]
> 38810417K->19703013K(135977472K), 11.2509770 secs] [Times: user=38.95
> sys=120.65, real=11.25 secs]
>
>
>
> And when it uses 50G memory, the minor GC takes only less than 1s.
>
>
>
> I try to see what is the best way to configure the Spark. For some special
> reason, I tempt to use a bigger memory on single executor if no significant
> penalty on performance. But now looks like it is?
>
>
>
> Anyone has any idea?
>
>
>
> Regards,
>
>
>
> Shuai
>


Re: Process time series RDD after sortByKey

2015-03-17 Thread Shawn Zheng
Hi Imran,
This is extremely helpful. This is not only an approach, also help me to
understand how to affect or customize my own DAG effectively.

Thanks a lot!

Shuai

On Monday, March 16, 2015, Imran Rashid  wrote:

> Hi Shuai,
>
> yup, that is exactly what I meant -- implement your own class
> MyGroupingRDD.  This is definitely more detail than a lot of users will
> need to go, but its also not all that scary either.  In this case, you want
> something that is *extremely* close to the existing CoalescedRDD, so start
> by looking at that code.
>
>
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
>
> The only thing which is complicated in CoalescedRDD is the
> PartitionCoalescer, but that is completely irrelevant for you, so you can
> ignore it.  I started writing up a description of what to do but then I
> realized just writing the code would be easier :)  Totally untested, but
> here you go:
>
> https://gist.github.com/squito/c2d1dd5413a60830d6f3
>
> The only really interesting part here is getPartitions:
>
>
> https://gist.github.com/squito/c2d1dd5413a60830d6f3#file-groupedrdd-scala-L31
>
> That's where you create partitions in your new RDD, which depend on
> multiple RDDs from the parent.  Also note that compute() is very simple:
> you just concatenate together the iterators from each of the parent RDDs:
>
>
> https://gist.github.com/squito/c2d1dd5413a60830d6f3#file-groupedrdd-scala-L37
>
> let me know how it goes!
>
>
> On Mon, Mar 16, 2015 at 5:15 PM, Shuai Zheng  > wrote:
>
>> Hi Imran,
>>
>>
>>
>> I am a bit confused here. Assume I have RDD a with 1000 partition and
>> also has been sorted. How can I control when creating RDD b (with 20
>> partitions) to make sure 1-50 partition of RDD a map to 1st partition of
>> RDD b? I don’t see any control code/logic here?
>>
>>
>>
>> You code below:
>>
>>
>>
>> val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions)
>>
>>
>>
>>
>>
>> Does it means I need to define/develop my own MyGroupingRDD class? I am
>> not very clear how to do that, any place I can find an example? I never
>> create my own RDD class before (not RDD instance J). But this is very
>> valuable approach to me so I am desired to learn.
>>
>>
>>
>> Regards,
>>
>>
>>
>> Shuai
>>
>>
>>
>> *From:* Imran Rashid [mailto:iras...@cloudera.com
>> ]
>> *Sent:* Monday, March 16, 2015 11:22 AM
>> *To:* Shawn Zheng; user@spark.apache.org
>> 
>> *Subject:* Re: Process time series RDD after sortByKey
>>
>>
>>
>> Hi Shuai,
>>
>>
>>
>> On Sat, Mar 14, 2015 at 11:02 AM, Shawn Zheng > > wrote:
>>
>> Sorry I response late.
>>
>> Zhan Zhang's solution is very interesting and I look at into it, but it
>> is not what I want. Basically I want to run the job sequentially and also
>> gain parallelism. So if possible, if I have 1000 partition, the best case
>> is I can run it as 20 subtask, each one take partition: 1-50, 51-100,
>> 101-150, etc.
>>
>> If we have ability to do this, we will gain huge flexibility when we try
>> to process some time series like data and a lot of algo will benefit from
>> it.
>>
>>
>>
>> yes, this is what I was suggesting you do.  You would first create one
>> RDD (a) that has 1000 partitions.  Don't worry about the creation of this
>> RDD -- it wont' create any tasks, its just a logical holder of your raw
>> data.  Then you create another RDD (b) that depends on your RDD (a), but
>> that only has 20 partitions.  Each partition in (b) would depend on a
>> number of partitions from (a).  As you've suggested, partition 1 in (b)
>> would depend on partitions 1-50 in (a), partition 2 in (b) would depend on
>> 51-100 in (a), etc.   Note that RDD (b) still doesn't *do* anything.  Its
>> just another logical holder for your data, but this time grouped in the way
>> you want.  Then after RDD (b), you would do whatever other transformations
>> you wanted, but now you'd be working w/ 20 partitions:
>>
>>
>>
>> val rawData1000Partitions = sc.textFile(...) // or whatever
>>
>> val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions)
>>
>> groupedRawData20Partitions.map{...}.filter{...}.reduceByKey{...} //etc.
>>
>>
>>
>> note that this is almost exactly the same as what CoalescedRdd does.
>> However, it might combine the 

Re: jar conflict with Spark default packaging

2015-03-16 Thread Shawn Zheng
Thanks a lot. I will give a try!

On Monday, March 16, 2015, Adam Lewandowski 
wrote:

> Prior to 1.3.0, Spark has 'spark.files.userClassPathFirst' for non-yarn
> apps. For 1.3.0, use 'spark.executor.userClassPathFirst'.
>
> See
> https://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3CCALrvLxdWwSByxNvcZtTVo8BsNRR_7tbPzWdUiAV8Ps8H1oAayQ%40mail.gmail.com%3E
>
> On Fri, Mar 13, 2015 at 1:04 PM, Shuai Zheng  > wrote:
>
>> Hi All,
>>
>>
>>
>> I am running spark to deal with AWS.
>>
>>
>>
>> And aws sdk latest version is working with httpclient 3.4+. Then but
>> spark-assembly-*-.jar file has packaged an old httpclient version which
>> cause me: ClassNotFoundException for
>> org/apache/http/client/methods/HttpPatch
>>
>>
>>
>> Even when I put the right httpclient jar there, it won’t help because
>> spark always take the class from same packaging first.
>>
>>
>>
>> I don’t know why spark only provide a big package which doesn’t allow us
>> to customize the library loading sequence. I know I can just rebuild the
>> spark, but this is very troublesome, and it should not be a general
>> solution for long term (I can’t rebuild spark jar every time when have a
>> jar conflict as spark is supposed to be a cluster).
>>
>>
>>
>> In hadoop, we have “mapreduce.job.user.classpath.first=true”. But
>> “spark.yarn.user.classpath.first” only work for Yarn.
>>
>>
>>
>> I think I am not the one who face this issue. Anyone has a more general
>> solution for this?
>>
>>
>>
>> Regards,
>>
>>
>>
>> Shuai
>>
>>
>>
>>
>>
>
>