RE: Spark SQL. Memory consumption

2015-04-02 Thread java8964
It is hard to say what could be reason without more detail information. If you 
provide some more information, maybe people here can help you better.
1) What is your worker's memory setting? It looks like that your nodes have 
128G physical memory each, but what do you specify for the worker's heap size? 
If you can paste your spark-env.sh and spark-defaults.conf content here, it 
will be helpful.2) You are doing join with 2 tables. 8G parquet files is small, 
compared to the heap you gave. But is it for one table? 2 tables? Is the data 
compressed?3) Your join key is different as your grouping keys, so my 
assumption is that this query should lead to 4 stages (I could be wrong, as I 
am kind of new to Spark SQL too). Is that right? If so, on what stage the OOM 
happened? With this information, it can help us to better judge which part 
caused OOM.4) When you set the spark.shuffle.partitions to 1024, did the stage 
3 and 4 really create 1024 tasks? 5) When the OOM happens, at least you can 
past the stack track of OOM, so it will help people here to guess which part of 
Spark leads to the OOM, so give you better suggests.
Thanks
Yong

Date: Thu, 2 Apr 2015 17:46:48 +0200
Subject: Spark SQL. Memory consumption
From: masfwo...@gmail.com
To: user@spark.apache.org

Hi. 
I'm using Spark SQL 1.2. I have this query:
CREATE TABLE test_MA STORED AS PARQUET AS  SELECT   field1  ,field2 ,field3 
,field4 ,field5 ,COUNT(1) AS field6 ,MAX(field7),MIN(field8)
,SUM(field9 / 100)  ,COUNT(field10) ,SUM(IF(field11  -500, 1, 0))  
,MAX(field12)   ,SUM(IF(field13 = 1, 1, 0))
,SUM(IF(field13 in (3,4,5,6,10,104,105,107), 1, 0)) ,SUM(IF(field13 
= 2012 , 1, 0)) ,SUM(IF(field13 in (0,100,101,102,103,106), 1, 0))  
FROM table1 CL  JOIN table2 netwON CL.field15 = 
netw.id WHERE   AND field3 IS NOT NULL  AND 
field4 IS NOT NULL  AND field5 IS NOT NULL  GROUP BY 
field1,field2,field3,field4, netw.field5

spark-submit --master spark://master:7077 --driver-memory 20g --executor-memory 
60g --class GMain project_2.10-1.0.jar --driver-class-path 
'/opt/cloudera/parcels/CDH/lib/hive/lib/*' --driver-java-options 
'-Dspark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hive/lib/*' 2 
./error


Input data is 8GB in parquet format. Many times crash by GC overhead. I've 
fixed spark.shuffle.partitions to 1024 but my worker nodes (with 128GB 
RAM/node) is collapsed.
Is it a query too difficult to Spark SQL? Would It be better to do it in 
Spark?Am I doing something wrong?

Thanks-- 


Regards.
Miguel Ángel
  

Re: Spark SQL. Memory consumption

2015-04-02 Thread Vladimir Rodionov
 Using large memory for executors (*--executor-memory 120g*).

Not really a good advice.

On Thu, Apr 2, 2015 at 9:17 AM, Cheng, Hao hao.ch...@intel.com wrote:

  Spark SQL tries to load the entire partition data and organized as
 In-Memory HashMaps, it does eat large memory if there are not many
 duplicated group by keys with large amount of records;



 Couple of things you can try case by case:

 ·Increasing the partition numbers (the records count in each
 partition will reduce)

 ·Using large memory for executors (*--executor-memory 120g*).

 ·Reduce the SPARK COREs (to reduce the parallel running threads)



 We are trying to approve that by using the sort-merge aggregation, which
 should reduce the memory utilization significantly, but that’s still on
 going.



 Cheng Hao



 *From:* Masf [mailto:masfwo...@gmail.com]
 *Sent:* Thursday, April 2, 2015 11:47 PM
 *To:* user@spark.apache.org
 *Subject:* Spark SQL. Memory consumption




  Hi.



 I'm using Spark SQL 1.2. I have this query:



 CREATE TABLE test_MA STORED AS PARQUET AS

  SELECT

 field1

 ,field2

 ,field3

 ,field4

 ,field5

 ,COUNT(1) AS field6

 ,MAX(field7)

 ,MIN(field8)

 ,SUM(field9 / 100)

 ,COUNT(field10)

 ,SUM(IF(field11  -500, 1, 0))

 ,MAX(field12)

 ,SUM(IF(field13 = 1, 1, 0))

 ,SUM(IF(field13 in (3,4,5,6,10,104,105,107), 1, 0))

 ,SUM(IF(field13 = 2012 , 1, 0))

 ,SUM(IF(field13 in (0,100,101,102,103,106), 1, 0))



 FROM table1 CL

JOIN table2 netw

 ON CL.field15 = netw.id

 WHERE

 AND field3 IS NOT NULL

 AND field4 IS NOT NULL

 AND field5 IS NOT NULL

 GROUP BY field1,field2,field3,field4, netw.field5





 spark-submit --master spark://master:7077 *--driver-memory 20g
 --executor-memory 60g* --class GMain project_2.10-1.0.jar
 --driver-class-path '/opt/cloudera/parcels/CDH/lib/hive/lib/*'
 --driver-java-options
 '-Dspark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hive/lib/*'
 2 ./error





 Input data is 8GB in parquet format. Many times crash by * GC overhead*.
 I've fixed spark.shuffle.partitions to 1024 but my worker nodes (with 128GB
 RAM/node) is collapsed.



 *Is it a query too difficult to Spark SQL? *

 *Would It be better to do it in Spark?*

 *Am I doing something wrong?*





 Thanks

 --




 Regards.
 Miguel Ángel



RE: Spark SQL. Memory consumption

2015-04-02 Thread Cheng, Hao
Spark SQL tries to load the entire partition data and organized as In-Memory 
HashMaps, it does eat large memory if there are not many duplicated group by 
keys with large amount of records;

Couple of things you can try case by case:

·Increasing the partition numbers (the records count in each partition 
will reduce)

·Using large memory for executors (--executor-memory 120g).

·Reduce the SPARK COREs (to reduce the parallel running threads)

We are trying to approve that by using the sort-merge aggregation, which should 
reduce the memory utilization significantly, but that’s still on going.

Cheng Hao

From: Masf [mailto:masfwo...@gmail.com]
Sent: Thursday, April 2, 2015 11:47 PM
To: user@spark.apache.org
Subject: Spark SQL. Memory consumption


Hi.

I'm using Spark SQL 1.2. I have this query:

CREATE TABLE test_MA STORED AS PARQUET AS
 SELECT
field1
,field2
,field3
,field4
,field5
,COUNT(1) AS field6
,MAX(field7)
,MIN(field8)
,SUM(field9 / 100)
,COUNT(field10)
,SUM(IF(field11  -500, 1, 0))
,MAX(field12)
,SUM(IF(field13 = 1, 1, 0))
,SUM(IF(field13 in (3,4,5,6,10,104,105,107), 1, 0))
,SUM(IF(field13 = 2012 , 1, 0))
,SUM(IF(field13 in (0,100,101,102,103,106), 1, 0))

FROM table1 CL
   JOIN table2 netw
ON CL.field15 = netw.idhttp://netw.id
WHERE
AND field3 IS NOT NULL
AND field4 IS NOT NULL
AND field5 IS NOT NULL
GROUP BY field1,field2,field3,field4, netw.field5


spark-submit --master spark://master:7077 --driver-memory 20g --executor-memory 
60g --class GMain project_2.10-1.0.jar --driver-class-path 
'/opt/cloudera/parcels/CDH/lib/hive/lib/*' --driver-java-options 
'-Dspark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hive/lib/*' 2 
./error


Input data is 8GB in parquet format. Many times crash by GC overhead. I've 
fixed spark.shuffle.partitions to 1024 but my worker nodes (with 128GB 
RAM/node) is collapsed.

Is it a query too difficult to Spark SQL?
Would It be better to do it in Spark?
Am I doing something wrong?


Thanks
--


Regards.
Miguel Ángel