Ohad Raviv created SPARK-24528:
----------------------------------

             Summary: Missing optimization for Aggregations/Windowing on a 
bucketed table
                 Key: SPARK-24528
                 URL: https://issues.apache.org/jira/browse/SPARK-24528
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 2.3.0, 2.4.0
            Reporter: Ohad Raviv


Closely related to  SPARK-24410, we're trying to optimize a very common use 
case we have of getting the most updated row by id from a fact table.

We're saving the table bucketed to skip the shuffle stage, but we're still 
"waste" time on the Sort operator evethough the data is already sorted.

here's a good example:
{code:java}
sparkSession.range(N).selectExpr(
  "id as key",
  "id % 2 as t1",
  "id % 3 as t2")
    .repartition(col("key"))
    .write
  .mode(SaveMode.Overwrite)
    .bucketBy(3, "key")
    .sortBy("key", "t1")
    .saveAsTable("a1"){code}
{code:java}
sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain

== Physical Plan ==
SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, 
key#24L, t1, t1#25L, t2, t2#26L))])
+- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, t1#25L, 
key, key#24L, t1, t1#25L, t2, t2#26L))])
+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, 
Format: Parquet, Location: ...{code}
 

and here's a bad example, but more realistic:
{code:java}
sparkSession.sql("set spark.sql.shuffle.partitions=2")
sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain

== Physical Plan ==
SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, 
key#32L, t1, t1#33L, t2, t2#34L))])
+- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, t1#33L, 
key, key#32L, t1, t1#33L, t2, t2#34L))])
+- *(1) Sort [key#32L ASC NULLS FIRST], false, 0
+- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, 
Format: Parquet, Location: ...

{code}
 

I've traced the problem to DataSourceScanExec#235:
{code:java}
val sortOrder = if (sortColumns.nonEmpty) {
  // In case of bucketing, its possible to have multiple files belonging to the
  // same bucket in a given relation. Each of these files are locally sorted
  // but those files combined together are not globally sorted. Given that,
  // the RDD partition will not be sorted even if the relation has sort columns 
set
  // Current solution is to check if all the buckets have a single file in it

  val files = selectedPartitions.flatMap(partition => partition.files)
  val bucketToFilesGrouping =
    files.map(_.getPath.getName).groupBy(file => 
BucketingUtils.getBucketId(file))
  val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 
1){code}
so obviously the code avoids dealing with this situation now..

could you think of a way to solve this or bypass it?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to