[ 
https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554972#comment-16554972
 ] 

Jason Guo commented on SPARK-24906:
-----------------------------------

Thanks [~maropu] and [~viirya] for your comments. Here is my solution for our 
cluster (more than 40 thousands nodes in total and more than 10 thousands nodes 
for a single cluster). 

1. When this is enabled

There is a configuration item named 

 
{code:java}
spark.sql.parquet.adaptiveFileSplit=false{code}
 

Only when this is enabled, DataSourceScanExec will enlarge the 
mapPartitionBytes. By default, this parameter is set to false. (Our ad-hoc 
query will set it to true)

 

2. How to calculate maxPartitionBytes and openCostInBytes

Different data type has different length (calculated with 
DataType.defaultSize). First we get the total size of the whole table 
(henceforth referred to as the “T”). Then we get the total size of all the 
requiredSchema (henceforth referred to as the “R”). The multiplier should be T 
/ R  .Then the maxPartitionBytes and openCostInBytes will be enlarge with T / R 
times.

 

 

 

> Enlarge split size for columnar file to ensure the task read enough data
> ------------------------------------------------------------------------
>
>                 Key: SPARK-24906
>                 URL: https://issues.apache.org/jira/browse/SPARK-24906
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.3.1
>            Reporter: Jason Guo
>            Priority: Critical
>         Attachments: image-2018-07-24-20-26-32-441.png, 
> image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, 
> image-2018-07-24-20-30-24-552.png
>
>
> For columnar file, such as, when spark sql read the table, each split will be 
> 128 MB by default since spark.sql.files.maxPartitionBytes is default to 
> 128MB. Even when user set it to a large value, such as 512MB, the task may 
> read only few MB or even hundreds of KB. Because the table (Parquet) may 
> consists of dozens of columns while the SQL only need few columns. And spark 
> will prune the unnecessary columns.
>  
> In this case, spark DataSourceScanExec can enlarge maxPartitionBytes 
> adaptively. 
> For example, there is 40 columns , 20 are integer while another 20 are long. 
> When use query on an integer type column and an long type column, the 
> maxPartitionBytes should be 20 times larger. (20*4+20*8) /  (4+8) = 20. 
>  
> With this optimization, the number of task will be smaller and the job will 
> run faster. More importantly, for a very large cluster (more the 10 thousand 
> nodes), it will relieve RM's schedule pressure.
>  
> Here is the test
>  
> The table named test2 has more than 40 columns and there are more than 5 TB 
> data each hour.
> When we issue a very simple query 
>  
> {code:java}
> select count(device_id) from test2 where date=20180708 and hour='23'{code}
>  
> There are 72176 tasks and the duration of the job is 4.8 minutes
> !image-2018-07-24-20-26-32-441.png!
>  
> Most tasks last less than 1 second and read less than 1.5 MB data
> !image-2018-07-24-20-28-06-269.png!
>  
> After the optimization, there are only 1615 tasks and the job last only 30 
> seconds. It almost 10 times faster.
> !image-2018-07-24-20-29-24-797.png!
>  
> The median of read data is 44.2MB. 
> !image-2018-07-24-20-30-24-552.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to