[ https://issues.apache.org/jira/browse/SPARK-40499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17803735#comment-17803735 ]
Joey Pereira edited comment on SPARK-40499 at 1/6/24 1:12 AM: -------------------------------------------------------------- Chiming in, I'm currently hitting a similar issue while doing upgrades from Spark 2.4 to Spark 3.1. In our case, we have an application with two queries that take {{percentile_approx}} of a fairly large dataset and reduce to only a few rows (reading 37 billion rows and aggregating to ~100 and ~7000 rows, with different groupBy keys respectively). This roughly what the query looks like: {code:java} select col1, percentile_approx(col2, 0.95) as q_95, percentile_approx(col2, 0.99) as q_99, percentile_approx(col2, 0.99) - percentile_approx(col2, 0.95) as q_diff from data group by 1 {code} Also, I do recognize this could be written more optimally as {{percentile_approx(col2, array(0.95, 0.99))}} to save on computation, this is just the snippet as it was when I stumbled on it :) We've observed that the shuffle write stage is roughly the same time (and size) but the shuffle read is drastically slower. Note, in this application the shuffle read is also collecting for a broadcast. The difference for the shuffle read is from 15 minutes on Spark 2.4 to 5.7 hours on Spark 3.1, and that was after reducing the {{percentile_approx}} precision from the default (10000) to 100 on Spark 3.1. When left at 10000, the stage was exceeding 15 hours and hitting some of our application time limits. *Spark 2.4* !Screenshot 2024-01-05 at 3.53.10 PM.png! - Stage 2 (shuffle write): 11 minutes, 67647 tasks, 22 GiB shuffle write size - Stage 9 (shuffle read): 15 minutes, 396 tasks, 22 GiB shuffle read size *Spark 3.1* !Screenshot 2024-01-05 at 3.51.52 PM.png! - Stage 1 (shuffle write): 7.6 minutes, 67396 tasks, 178.7 GiB input size, 20.3 GiB shuffle write size - Stage 5 (shuffle read): 5.7 hours, 1331 tasks, 20.3 GiB shuffle read size Prior to digging further into this, I thought it may have been some AQE issue. That was incorrect and in fact the post-aggregation task/partition count was much worse without AQE, at about 150000. I dug into any relevant changes between Spark 2.4 and 3.1, as well as changes since 3.1, to see if there's anything significant. I just looked at PRs against {{QuantileSummaries.scala}} and {{{}ApproximatePercentile.scala{}}}, and ignored some docs, formatting, refactoring, or very simple behavioural changes. These are the notable changes with some commentary. For changes between 2.4 and 3.1: # [https://github.com/apache/spark/commit/de360e96d776f51c0fd7c31dcec142feabf3d543] which re-implements the QuantileSummaries {{merge}} algorithm rather deeply. This seems potentially relevant, especially since the shuffle reading which is likely just executing merging is struggling. That being said, I would have maybe expected this to also impact the partial_percentile_approx calculations too. # [https://github.com/apache/spark/commit/023eb482b23b5d63d2157b3def9926673844e0a3] -appears to change the behaviour in evaluation for {{case when}} / conditional inputs. This is possibly relevant, but our SQL code only plumbs a primitive in. Our actual job is a little more complex and the SQL is over a subquery, but the subquery is not manipulating the column using for {{{}percentile_approx{}}}.- I misunderstood this change initially. It looks like an optimization to avoid reconstructing the data type for some parent plan nodes. Our SQL code does consume percentiles in some CaseWhen clauses, so I'll need to dig more into this. I didn't go far while looking into other code. I did not see relevant changes in {{{}AggregatingAccumulator{}}}. For changes between 3.1 onwards to {{ApproximatePercentile}} and {{{}QuantileSummaries{}}}: # (in 3.2) [https://github.com/apache/spark/commit/6f8c62047cea125d52af5dad7fb5ad3eadb7f7d0] which appears to optimize the {{ApproxPercentiles.getPercentiles}} function. It seems to only be executed on the final result collection and optimizes for cases like {{percentile_approx(..., array(0.5, 0.9, 0.95), ...)}} - multiple value percentile calculations. # (in 3.2) [https://github.com/apache/spark/commit/0945baf90660a101ae0f86a39d4c91ca74ae5ee3] which appears to change some plan interfaces. It reads as though only impacted query compilation performance, rather than execution performance. Because I'm not seeing any changes that may possible address this performance issue, I'm led to believe this is still an existing problem in Spark. I have some follow-up I'll do to dig into this further, namely: - Test the job with {{percentile}} to see if it performs better, although I'm a bit dubious we will get this to run with 37 billion input rows - Test the job on Spark 3.3, which we have production-ready in our environment - Observe some of the thread dumps / profiling for the stage with poor performance - Rewrite or isolate the ApproximatePercentile from the CaseWhen to narrow down if that change is related was (Author: legojoey17): Chiming in, I'm currently hitting a similar issue while doing upgrades from Spark 2.4 to Spark 3.1. In our case, we have an application with two queries that take {{percentile_approx}} of a fairly large dataset and reduce to only a few rows (reading 37 billion rows and aggregating to ~100 and ~7000 rows, with different groupBy keys respectively). This roughly what the query looks like: {code:java} select col1, percentile_approx(col2, 0.95) as q_95, percentile_approx(col2, 0.99) as q_99, percentile_approx(col2, 0.99) - percentile_approx(col2, 0.95) as q_diff from data group by 1 {code} Also, I do recognize this could be written more optimally as {{percentile_approx(col2, array(0.95, 0.99))}} to save on computation, this is just the snippet as it was when I stumbled on it :) We've observed that the shuffle write stage is roughly the same time (and size) but the shuffle read is drastically slower. Note, in this application the shuffle read is also collecting for a broadcast. The difference for the shuffle read is from 15 minutes on Spark 2.4 to 5.7 hours on Spark 3.1, and that was after reducing the {{percentile_approx}} precision from the default (10000) to 100 on Spark 3.1. When left at 10000, the stage was exceeding 15 hours and hitting some of our application time limits. *Spark 2.4* !Screenshot 2024-01-05 at 3.53.10 PM.png! - Stage 2 (shuffle write): 11 minutes, 67647 tasks, 22 GiB shuffle write size - Stage 9 (shuffle read): 15 minutes, 396 tasks, 22 GiB shuffle read size *Spark 3.1* !Screenshot 2024-01-05 at 3.51.52 PM.png! - Stage 1 (shuffle write): 7.6 minutes, 67396 tasks, 178.7 GiB input size, 20.3 GiB shuffle write size - Stage 5 (shuffle read): 5.7 hours, 1331 tasks, 20.3 GiB shuffle read size Prior to digging further into this, I thought it may have been some AQE issue. That was incorrect and in fact the post-aggregation task/partition count was much worse without AQE, at about 150000. I dug into any relevant changes between Spark 2.4 and 3.1, as well as changes since 3.1, to see if there's anything significant. I just looked at PRs against {{QuantileSummaries.scala}} and {{{}ApproximatePercentile.scala{}}}, and ignored some docs, formatting, refactoring, or very simple behavioural changes. These are the notable changes with some commentary. For changes between 2.4 and 3.1: # [https://github.com/apache/spark/commit/de360e96d776f51c0fd7c31dcec142feabf3d543] which re-implements the QuantileSummaries {{merge}} algorithm rather deeply. This seems potentially relevant, especially since the shuffle reading which is likely just executing merging is struggling. That being said, I would have maybe expected this to also impact the partial_percentile_approx calculations too. # [https://github.com/apache/spark/commit/023eb482b23b5d63d2157b3def9926673844e0a3] -appears to change the behaviour in evaluation for {{case when}} / conditional inputs. This is possibly relevant, but our SQL code only plumbs a primitive in. Our actual job is a little more complex and the SQL is over a subquery, but the subquery is not manipulating the column using for {{{}percentile_approx{}}}.- I misunderstood this change initially. It looks like an optimization to avoid reconstructing the data type for some parent plan nodes. Our SQL code does consume percentiles in some CaseWhen clauses, so I'll need to dig more into this. I didn't go far while looking into other code. I did not see relevant changes in {{{}AggregatingAccumulator{}}}. For changes between 3.1 onwards to {{ApproximatePercentile}} and {{{}QuantileSummaries{}}}: # (in 3.2) [https://github.com/apache/spark/commit/6f8c62047cea125d52af5dad7fb5ad3eadb7f7d0] which appears to optimize the {{ApproxPercentiles.getPercentiles}} function. It seems to only be executed on the final result collection and optimizes for cases like {{percentile_approx(..., array(0.5, 0.9, 0.95), ...)}} - multiple value percentile calculations. # (in 3.2) [https://github.com/apache/spark/commit/0945baf90660a101ae0f86a39d4c91ca74ae5ee3] which appears to change some plan interfaces. It reads as though only impacted query compilation performance, rather than execution performance. Because I'm not seeing any changes that may possible address this performance issue, I'm led to believe this is still an existing problem in Spark. I have some follow-up I'll do to dig into this further, namely: - Test the job with {{percentile}} to see if it performs better, although I'm a bit dubious we will get this to run with 37 billion input rows - Test the job on Spark 3.3, which we have production-ready in our environment - Observe some of the thread dumps / profiling for the stage with poor performance > Spark 3.2.1 percentlie_approx query much slower than Spark 2.4.0 > ---------------------------------------------------------------- > > Key: SPARK-40499 > URL: https://issues.apache.org/jira/browse/SPARK-40499 > Project: Spark > Issue Type: Bug > Components: Shuffle > Affects Versions: 3.2.1 > Environment: hadoop: 3.0.0 > spark: 2.4.0 / 3.2.1 > shuffle:spark 2.4.0 > Reporter: xuanzhiang > Priority: Major > Attachments: Screenshot 2024-01-05 at 3.51.52 PM.png, Screenshot > 2024-01-05 at 3.53.10 PM.png, spark2.4-shuffle-data.png, > spark3.2-shuffle-data.png > > > spark.sql( > s""" > |SELECT > | Info , > | PERCENTILE_APPROX(cost,0.5) cost_p50, > | PERCENTILE_APPROX(cost,0.9) cost_p90, > | PERCENTILE_APPROX(cost,0.95) cost_p95, > | PERCENTILE_APPROX(cost,0.99) cost_p99, > | PERCENTILE_APPROX(cost,0.999) cost_p999 > |FROM > | textData > |""".stripMargin) > * When we used spark 2.4.0, aggregation adopted objHashAggregator, stage 2 > pull shuffle data very quick . but , when we use spark 3.2.1 and use old > shuffle , 140M shuffle data cost 3 hours. > * If we upgrade the Shuffle, will we get performance regression? > * -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org