I can understand why your first query will finish without OOM, but the new one 
will fail with OOM.
In the new query, you are asking a groupByKey/cogroup operation, which will 
force all the productName + prodcutionCatagory per user id sent to the same 
reducer. This could easily below out reducer's memory if you have one user id 
having lot of productName and productCatagory.
Keep in mind that Spark on the reducer side still use a Hash to merge all the 
data from different mappers, so the memory in the reduce side has to be able to 
merge all the productionName + productCatagory for the most frequently shown up 
user id (at least), and I don't know why you want all the productName and 
productCategory per user Id (Maybe a distinct could be enough?).
Image you have one user id show up 1M time in your dataset, with 0.5M 
productname as 'A', and 0.5M product name as 'B', and your query will push 1M 
of 'A' and 'B' into the same reducer, and ask Spark to merge them in the 
HashMap for you for that user Id. This will cause OOM.
Above all, you need to find out what is the max count per user id in your data: 
select max(count(*)) from land where ..... group by userid
Your memory has to support that amount of productName and productCatagory, and 
if your partition number is not high enough (even as your unique count of user 
id), if that is really what you want, to consolidate all the productionName and 
product catagory together, without even consider removing duplication.
But both query still should push similar records count per partition, but with 
much of different volume size of data.
Yong

Subject: Re: Java Heap Space Error
From: yu...@useinsider.com
Date: Thu, 24 Sep 2015 18:56:51 +0300
CC: jingyu.zh...@news.com.au; user@spark.apache.org
To: java8...@hotmail.com

Yes right, the query you wrote worked in same cluster. In this case, partitions 
were equally distributed but when i used regex and concetanations it’s not as i 
said before. Query with concetanation is below:
val usersInputDF = sqlContext.sql(
  s"""
     |  select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname 
is not 
NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",'
 '))))) inputlist from landing where 
dt='${dateUtil.getYear}-${dateUtil.getMonth}' and day >= '${day}' and userid != 
'' and userid is not null and userid is not NULL and pagetype = 'productDetail' 
group by userid

   """.stripMargin)

On 24 Sep 2015, at 16:52, java8964 <java8...@hotmail.com> wrote:This is 
interesting.
So you mean that query as 
"select userid from landing where dt='2015-9' and userid != '' and userid is 
not null and userid is not NULL and pagetype = 'productDetail' group by userid"
works in your cluster?
In this case, do you also see this one task with way more data than the rest, 
as it happened when you use regex and concatenation?
It is hard to believe that just add "regex" and "concatenation" will make the 
distribution more equally across partitions. In your query, the distribution in 
the partitions simply depends on the Hash partitioner of "userid".
Can you show us the query after you add "regex" and "concatenation"?
Yong

Subject: Re: Java Heap Space Error
From: yu...@useinsider.com
Date: Thu, 24 Sep 2015 15:34:48 +0300
CC: user@spark.apache.org
To: jingyu.zh...@news.com.au; java8...@hotmail.com

@JingyuYes, it works without regex and concatenation as the query below:
So, what we can understand from this? Because when i do like that, shuffle read 
sizes are equally distributed between partitions.
val usersInputDF = sqlContext.sql(s"""         |  select userid from landing 
where dt='2015-9' and userid != '' and userid is not null and userid is not 
NULL and pagetype = 'productDetail' group by userid
       """.stripMargin)
@java8964
I tried with sql.shuffle.partitions = 10000 but no luck. It’s again one of the 
partitions shuffle size is huge and the others are very small.

——So how can i balance this shuffle read size between partitions?

On 24 Sep 2015, at 03:35, Zhang, Jingyu <jingyu.zh...@news.com.au> wrote:Is you 
sql works if do not runs a regex on strings and concatenates them, I mean just 
Select the stuff without String operations?

On 24 September 2015 at 10:11, java8964 <java8...@hotmail.com> wrote:
Try to increase partitions count, that will make each partition has less data.
Yong

Subject: Re: Java Heap Space Error
From: yu...@useinsider.com
Date: Thu, 24 Sep 2015 00:32:47 +0300
CC: user@spark.apache.org
To: java8...@hotmail.com

Yes, it’s possible. I use S3 as data source. My external tables has 
partitioned. Belowed task is 193/200. Job has 2 stages and its 193. task of 200 
in 2.stage because of sql.shuffle.partitions. 
How can i avoid this situation, this is my query:
select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname is not 
NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",'
 '))))) inputlist from landing where dt='2015-9' and userid != '' and userid is 
not null and userid is not NULL and pagetype = 'productDetail' group by userid

On 23 Sep 2015, at 23:55, java8964 <java8...@hotmail.com> wrote:
Based on your description, you job shouldn't have any shuffle then, as you just 
apply regex and concatenation on the column, but there is one partition having 
4.3M records to be read, vs less than 1M records for other partitions.
Is that possible? It depends on what is the source of your data.
If there is shuffle in your query (More than 2 stages generated by your query, 
and this is my guess of what happening), then it simple means that one 
partition having way more data than the rest of partitions.
Yong
From: yu...@useinsider.com
Subject: Java Heap Space Error
Date: Wed, 23 Sep 2015 23:07:17 +0300
To: user@spark.apache.org

What can cause this issue in the attached picture? I’m running and sql query 
which runs a regex on strings and concatenates them. Because of this task, my 
job gives java heap space error.
<Screen Shot 2015-09-23 at 23.03.18.png>


This message and its attachments may contain legally privileged or confidential 
information. It is intended solely for the named addressee. If you are not the 
addressee indicated in this message or responsible for delivery of the message 
to the addressee, you may not copy or deliver this message or its attachments 
to anyone. Rather, you should permanently delete this message and its 
attachments and kindly notify the sender by reply e-mail. Any content of this 
message and its attachments which does not relate to the official business of 
the sending company must be taken not to have been sent or endorsed by that 
company or any of its related entities. No warranty is made that the e-mail or 
attachments are free from computer virus or other defect.
                                          

Reply via email to