[ https://issues.apache.org/jira/browse/SPARK-24528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ohad Raviv updated SPARK-24528: ------------------------------- Description: https://issues.apache.org/jira/browse/SPARK-24528#Closely related to SPARK-24410, we're trying to optimize a very common use case we have of getting the most updated row by id from a fact table. We're saving the table bucketed to skip the shuffle stage, but we're still "waste" time on the Sort operator evethough the data is already sorted. here's a good example: {code:java} sparkSession.range(N).selectExpr( "id as key", "id % 2 as t1", "id % 3 as t2") .repartition(col("key")) .write .mode(SaveMode.Overwrite) .bucketBy(3, "key") .sortBy("key", "t1") .saveAsTable("a1"){code} {code:java} sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain == Physical Plan == SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))]) +- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))]) +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, Format: Parquet, Location: ...{code} and here's a bad example, but more realistic: {code:java} sparkSession.sql("set spark.sql.shuffle.partitions=2") sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain == Physical Plan == SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))]) +- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))]) +- *(1) Sort [key#32L ASC NULLS FIRST], false, 0 +- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, Format: Parquet, Location: ... {code} I've traced the problem to DataSourceScanExec#235: {code:java} val sortOrder = if (sortColumns.nonEmpty) { // In case of bucketing, its possible to have multiple files belonging to the // same bucket in a given relation. Each of these files are locally sorted // but those files combined together are not globally sorted. Given that, // the RDD partition will not be sorted even if the relation has sort columns set // Current solution is to check if all the buckets have a single file in it val files = selectedPartitions.flatMap(partition => partition.files) val bucketToFilesGrouping = files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file)) val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1){code} so obviously the code avoids dealing with this situation now.. could you think of a way to solve this or bypass it? was: Closely related to SPARK-24410, we're trying to optimize a very common use case we have of getting the most updated row by id from a fact table. We're saving the table bucketed to skip the shuffle stage, but we're still "waste" time on the Sort operator evethough the data is already sorted. here's a good example: {code:java} sparkSession.range(N).selectExpr( "id as key", "id % 2 as t1", "id % 3 as t2") .repartition(col("key")) .write .mode(SaveMode.Overwrite) .bucketBy(3, "key") .sortBy("key", "t1") .saveAsTable("a1"){code} {code:java} sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain == Physical Plan == SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))]) +- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))]) +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, Format: Parquet, Location: ...{code} and here's a bad example, but more realistic: {code:java} sparkSession.sql("set spark.sql.shuffle.partitions=2") sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain == Physical Plan == SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))]) +- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))]) +- *(1) Sort [key#32L ASC NULLS FIRST], false, 0 +- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, Format: Parquet, Location: ... {code} I've traced the problem to DataSourceScanExec#235: {code:java} val sortOrder = if (sortColumns.nonEmpty) { // In case of bucketing, its possible to have multiple files belonging to the // same bucket in a given relation. Each of these files are locally sorted // but those files combined together are not globally sorted. Given that, // the RDD partition will not be sorted even if the relation has sort columns set // Current solution is to check if all the buckets have a single file in it val files = selectedPartitions.flatMap(partition => partition.files) val bucketToFilesGrouping = files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file)) val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1){code} so obviously the code avoids dealing with this situation now.. could you think of a way to solve this or bypass it? > Missing optimization for Aggregations/Windowing on a bucketed table > ------------------------------------------------------------------- > > Key: SPARK-24528 > URL: https://issues.apache.org/jira/browse/SPARK-24528 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 2.3.0, 2.4.0 > Reporter: Ohad Raviv > Priority: Major > > https://issues.apache.org/jira/browse/SPARK-24528#Closely related to > SPARK-24410, we're trying to optimize a very common use case we have of > getting the most updated row by id from a fact table. > We're saving the table bucketed to skip the shuffle stage, but we're still > "waste" time on the Sort operator evethough the data is already sorted. > here's a good example: > {code:java} > sparkSession.range(N).selectExpr( > "id as key", > "id % 2 as t1", > "id % 3 as t2") > .repartition(col("key")) > .write > .mode(SaveMode.Overwrite) > .bucketBy(3, "key") > .sortBy("key", "t1") > .saveAsTable("a1"){code} > {code:java} > sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain > == Physical Plan == > SortAggregate(key=[key#24L], functions=[max(named_struct(t1, t1#25L, key, > key#24L, t1, t1#25L, t2, t2#26L))]) > +- SortAggregate(key=[key#24L], functions=[partial_max(named_struct(t1, > t1#25L, key, key#24L, t1, t1#25L, t2, t2#26L))]) > +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: true, > Format: Parquet, Location: ...{code} > > and here's a bad example, but more realistic: > {code:java} > sparkSession.sql("set spark.sql.shuffle.partitions=2") > sparkSession.sql("select max(struct(t1, *)) from a1 group by key").explain > == Physical Plan == > SortAggregate(key=[key#32L], functions=[max(named_struct(t1, t1#33L, key, > key#32L, t1, t1#33L, t2, t2#34L))]) > +- SortAggregate(key=[key#32L], functions=[partial_max(named_struct(t1, > t1#33L, key, key#32L, t1, t1#33L, t2, t2#34L))]) > +- *(1) Sort [key#32L ASC NULLS FIRST], false, 0 > +- *(1) FileScan parquet default.a1[key#32L,t1#33L,t2#34L] Batched: true, > Format: Parquet, Location: ... > {code} > > I've traced the problem to DataSourceScanExec#235: > {code:java} > val sortOrder = if (sortColumns.nonEmpty) { > // In case of bucketing, its possible to have multiple files belonging to > the > // same bucket in a given relation. Each of these files are locally sorted > // but those files combined together are not globally sorted. Given that, > // the RDD partition will not be sorted even if the relation has sort > columns set > // Current solution is to check if all the buckets have a single file in it > val files = selectedPartitions.flatMap(partition => partition.files) > val bucketToFilesGrouping = > files.map(_.getPath.getName).groupBy(file => > BucketingUtils.getBucketId(file)) > val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= > 1){code} > so obviously the code avoids dealing with this situation now.. > could you think of a way to solve this or bypass it? -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org