[ 
https://issues.apache.org/jira/browse/SPARK-26164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700003#comment-16700003
 ] 

Cheng Su commented on SPARK-26164:
----------------------------------

_> I think we can follow aggregate, try hash writer first, and fallback to sort 
when we are short of memory_

This is a good idea. After checking aggregation implementation, I think we can 
do following for writing table (i.e. FileFormatDataWriter and FileFormatWriter):

1.Maintain mapping between file path and output writer, and re-use writer for 
writing input row. In case of the number of opened output writers exceeding a 
threshold (can be changed by a config), we go to 2.

2.Sort the rest of input rows (use the same sorter in SortExec). Then writing 
the rest of sorted rows, and we can close the writer on the fly, in case no 
more rows for current file path.

* I use the number of output writers here to proxy the used memory. Let me know 
if there's a better way to know if the executor is short of memory.

Do you guys think we are good to go implementing with above design? cc 
[~cloud_fan], [~tejasp] and [~sameerag], thanks!

> [SQL] Allow FileFormatWriter to write multiple partitions/buckets without sort
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-26164
>                 URL: https://issues.apache.org/jira/browse/SPARK-26164
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: Cheng Su
>            Priority: Minor
>
> Problem:
> Current spark always requires a local sort before writing to output table on 
> partition/bucket columns [1]. The disadvantage is the sort might waste 
> reserved CPU time on executor due to spill. Hive does not require the local 
> sort before writing output table [2], and we saw performance regression when 
> migrating hive workload to spark.
>  
> Proposal:
> We can avoid the local sort by keeping the mapping between file path and 
> output writer. In case of writing row to a new file path, we create a new 
> output writer. Otherwise, re-use the same output writer if the writer already 
> exists (mainly change should be in FileFormatDataWriter.scala). This is very 
> similar to what hive does in [2].
> Given the new behavior (i.e. avoid sort by keeping multiple output writer) 
> consumes more memory on executor (multiple output writer needs to be opened 
> in same time), than the current behavior (i.e. only one output writer 
> opened). We can add the config to switch between the current and new behavior.
>  
> [1]: spark FileFormatWriter.scala - 
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L123]
> [2]: hive FileSinkOperator.java - 
> [https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java#L510]
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to