[ 
https://issues.apache.org/jira/browse/SPARK-24528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511533#comment-16511533
 ] 

Wenchen Fan commented on SPARK-24528:
-------------------------------------

I have 2 ideas:
1. provide an option to let Spark only produce one file per bucket, if users 
are willing to write slower but read faster. But we need to carefully design 
what we should do for appending data.
2. implement a special merge-sort operator to sort the files in one bucket 
faster.

Both of them are not trivial, feel free to write a design doc if you want to 
drive this project.

> Missing optimization for Aggregations/Windowing on a bucketed table
> -------------------------------------------------------------------
>
>                 Key: SPARK-24528
>                 URL: https://issues.apache.org/jira/browse/SPARK-24528
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.3.0, 2.4.0
>            Reporter: Ohad Raviv
>            Priority: Major
>
> Closely related to  SPARK-24410, we're trying to optimize a very common use 
> case we have of getting the most updated row by id from a fact table.
> We're saving the table bucketed to skip the shuffle stage, but we're still 
> "waste" time on the Sort operator evethough the data is already sorted.
> here's a good example:
> {code:java}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>     .repartition(col("key"))
>     .write
>   .mode(SaveMode.Overwrite)
>     .bucketBy(3, "key")
>     .sortBy("key", "t1")
>     .saveAsTable("a1"){code}
> {code:java}
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, 
> key#24L, t1, t1#25L, t2, t2#26L))])
> +- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, 
> t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))])
> +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, 
> Format: Parquet, Location: ...{code}
>  
> and here's a bad example, but more realistic:
> {code:java}
> sparkSession.sql("set spark.sql.shuffle.partitions=2")
> sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain
> == Physical Plan ==
> SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, 
> key#32L, t1, t1#33L, t2, t2#34L))])
> +- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, 
> t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))])
> +- *(1) Sort [key#32L ASC NULLS FIRST], false, 0
> +- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, 
> Format: Parquet, Location: ...
> {code}
>  
> I've traced the problem to DataSourceScanExec#235:
> {code:java}
> val sortOrder = if (sortColumns.nonEmpty) {
>   // In case of bucketing, its possible to have multiple files belonging to 
> the
>   // same bucket in a given relation. Each of these files are locally sorted
>   // but those files combined together are not globally sorted. Given that,
>   // the RDD partition will not be sorted even if the relation has sort 
> columns set
>   // Current solution is to check if all the buckets have a single file in it
>   val files = selectedPartitions.flatMap(partition => partition.files)
>   val bucketToFilesGrouping =
>     files.map(_.getPath.getName).groupBy(file => 
> BucketingUtils.getBucketId(file))
>   val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 
> 1){code}
> so obviously the code avoids dealing with this situation now..
> could you think of a way to solve this or bypass it?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to