[ 
https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554340#comment-16554340
 ] 

Takeshi Yamamuro commented on SPARK-24906:
------------------------------------------

Ah, I see. It make some sense to me. In DataSourceScanExec, in case that 
`requiredSchema` has the relatively smaller number of columns than 
`dataSchema`, we might consider an additional term to make `maxSplitBytes` 
larger in `createNonBucketedReadRDD`.
https://github.com/apache/spark/blob/3d5c61e5fd24f07302e39b5d61294da79aa0c2f9/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L316

WDYT? [~smilegator]  [~viirya] 

> Enlarge split size for columnar file to ensure the task read enough data
> ------------------------------------------------------------------------
>
>                 Key: SPARK-24906
>                 URL: https://issues.apache.org/jira/browse/SPARK-24906
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.3.1
>            Reporter: Jason Guo
>            Priority: Critical
>         Attachments: image-2018-07-24-20-26-32-441.png, 
> image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, 
> image-2018-07-24-20-30-24-552.png
>
>
> For columnar file, such as, when spark sql read the table, each split will be 
> 128 MB by default since spark.sql.files.maxPartitionBytes is default to 
> 128MB. Even when user set it to a large value, such as 512MB, the task may 
> read only few MB or even hundreds of KB. Because the table (Parquet) may 
> consists of dozens of columns while the SQL only need few columns. And spark 
> will prune the unnecessary columns.
>  
> In this case, spark DataSourceScanExec can enlarge maxPartitionBytes 
> adaptively. 
> For example, there is 40 columns , 20 are integer while another 20 are long. 
> When use query on an integer type column and an long type column, the 
> maxPartitionBytes should be 20 times larger. (20*4+20*8) /  (4+8) = 20. 
>  
> With this optimization, the number of task will be smaller and the job will 
> run faster. More importantly, for a very large cluster (more the 10 thousand 
> nodes), it will relieve RM's schedule pressure.
>  
> Here is the test
>  
> The table named test2 has more than 40 columns and there are more than 5 TB 
> data each hour.
> When we issue a very simple query 
>  
> {code:java}
> select count(device_id) from test2 where date=20180708 and hour='23'{code}
>  
> There are 72176 tasks and the duration of the job is 4.8 minutes
> !image-2018-07-24-20-26-32-441.png!
>  
> Most tasks last less than 1 second and read less than 1.5 MB data
> !image-2018-07-24-20-28-06-269.png!
>  
> After the optimization, there are only 1615 tasks and the job last only 30 
> seconds. It almost 10 times faster.
> !image-2018-07-24-20-29-24-797.png!
>  
> The median of read data is 44.2MB. 
> !image-2018-07-24-20-30-24-552.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to