[ https://issues.apache.org/jira/browse/SPARK-46706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Joey Pereira updated SPARK-46706: --------------------------------- Summary: percentile_approx regression since Spark 2.4 (was: percentile_approx regression between at least Spark 3.1) > percentile_approx regression since Spark 2.4 > -------------------------------------------- > > Key: SPARK-46706 > URL: https://issues.apache.org/jira/browse/SPARK-46706 > Project: Spark > Issue Type: Bug > Components: Shuffle, SQL > Affects Versions: 3.1.3 > Reporter: Joey Pereira > Priority: Major > Attachments: Screenshot 2024-01-05 at 3.51.52 PM.png, Screenshot > 2024-01-05 at 3.53.10 PM.png > > > Originally reported in SPARK-40499, I'm opening a new issue as that original > one was closed. It's not entirely clear if this regression exists on HEAD, > but I haven't seen any changes that I would expect to address this. Here's > what I reported in the other ticket. > --- > Chiming in, I'm currently hitting a similar issue while doing upgrades from > Spark 2.4 to Spark 3.1. > In our case, we have an application with two queries that take > {{percentile_approx}} of a fairly large dataset and reduce to only a few rows > (reading 37 billion rows and aggregating to ~100 and ~7000 rows, with > different groupBy keys respectively). This roughly what the query looks like: > {code:java} > select > col1, > percentile_approx(col2, 0.95) as q_95, > percentile_approx(col2, 0.99) as q_99, > percentile_approx(col2, 0.99) - percentile_approx(col2, 0.95) as q_diff > from data > group by 1 > {code} > Also, I do recognize this could be written more optimally as > {{percentile_approx(col2, array(0.95, 0.99))}} to save on computation, this > is just the snippet as it was when I stumbled on it :) > We've observed that the shuffle write stage is roughly the same time (and > size) but the shuffle read is drastically slower. Note, in this application > the shuffle read is also collecting for a broadcast. The difference for the > shuffle read is from 15 minutes on Spark 2.4 to 5.7 hours on Spark 3.1, and > that was after reducing the {{percentile_approx}} precision from the default > (10000) to 100 on Spark 3.1. When left at 10000, the stage was exceeding 15 > hours and hitting some of our application time limits. > *Spark 2.4* > !Screenshot 2024-01-05 at 3.53.10 PM.png! > - Stage 2 (shuffle write): 11 minutes, 67647 tasks, 22 GiB shuffle write size > - Stage 9 (shuffle read): 15 minutes, 396 tasks, 22 GiB shuffle read size > *Spark 3.1* > !Screenshot 2024-01-05 at 3.51.52 PM.png! > - Stage 1 (shuffle write): 7.6 minutes, 67396 tasks, 178.7 GiB input size, > 20.3 GiB shuffle write size > - Stage 5 (shuffle read): 5.7 hours, 1331 tasks, 20.3 GiB shuffle read size > Prior to digging further into this, I thought it may have been some AQE > issue. That was incorrect and in fact the post-aggregation task/partition > count was much worse without AQE, at about 150000. > I dug into any relevant changes between Spark 2.4 and 3.1, as well as changes > since 3.1, to see if there's anything significant. I just looked at PRs > against {{QuantileSummaries.scala}} and {{{}ApproximatePercentile.scala{}}}, > and ignored some docs, formatting, refactoring, or very simple behavioural > changes. These are the notable changes with some commentary. > For changes between 2.4 and 3.1: > # > [https://github.com/apache/spark/commit/de360e96d776f51c0fd7c31dcec142feabf3d543] > which re-implements the QuantileSummaries {{merge}} algorithm rather deeply. > This seems potentially relevant, especially since the shuffle reading which > is likely just executing merging is struggling. That being said, I would have > maybe expected this to also impact the partial_percentile_approx calculations > too. > # > [https://github.com/apache/spark/commit/023eb482b23b5d63d2157b3def9926673844e0a3] > -appears to change the behaviour in evaluation for {{case when}} / > conditional inputs. This is possibly relevant, but our SQL code only plumbs a > primitive in. Our actual job is a little more complex and the SQL is over a > subquery, but the subquery is not manipulating the column using for > {{{}percentile_approx{}}}.- I misunderstood this change initially. It looks > like an optimization to avoid reconstructing the data type for some parent > plan nodes. Our SQL code does consume percentiles in some CaseWhen clauses, > so I'll need to dig more into this. > I didn't go far while looking into other code. I did not see relevant changes > in {{{}AggregatingAccumulator{}}}. > For changes between 3.1 onwards to {{ApproximatePercentile}} and > {{{}QuantileSummaries{}}}: > # (in 3.2) > [https://github.com/apache/spark/commit/6f8c62047cea125d52af5dad7fb5ad3eadb7f7d0] > which appears to optimize the {{ApproxPercentiles.getPercentiles}} function. > It seems to only be executed on the final result collection and optimizes for > cases like {{percentile_approx(..., array(0.5, 0.9, 0.95), ...)}} - multiple > value percentile calculations. > # (in 3.2) > [https://github.com/apache/spark/commit/0945baf90660a101ae0f86a39d4c91ca74ae5ee3] > which appears to change some plan interfaces. It reads as though only > impacted query compilation performance, rather than execution performance. > Because I'm not seeing any changes that may possible address this performance > issue, I'm led to believe this is still an existing problem in Spark. > I have some follow-up I'll do to dig into this further, namely: > - Test the job with {{percentile}} to see if it performs better, although > I'm a bit dubious we will get this to run with 37 billion input rows > - Test the job on Spark 3.3, which we have production-ready in our > environment > - Observe some of the thread dumps / profiling for the stage with poor > performance > - Isolate, or otherwise rewrite, the ApproximatePercentile going into a > CaseWhen in our specific SQL to narrow down if that change is related > Here's an expanded version of our query, with irrelevant columns removed. I > figured I'd include this after realizing the relevant of the CaseWhen parent > node. > {code} > with source_data as ( > select col1, col2 -- ... additional columns > from <table> > ), percentiles as ( > select > col1, > approx_percentile(col2, 0.95) as q_95, > approx_percentile(col2, 0.99) as q_99, > approx_percentile(col2, 0.99) - approx_percentile(col2, 0.95) as q_diff > from source_data > group by 1 > ), calculations as ( > select > col1, > case > when q_diff > 5 then q_95 + 5 > else q_99 > end as calculated_val > from percentiles > ) > select > /*+ BROADCAST (c) */ > source_data.*, c.calculated_val > from source_data > left join calculations as c using(col1) > -- ... additional joins / broadcasts > {code} > --- > Later on, I found another app of ours with similarly unbearable regressions. > The app previously performed in <2hr run-time prior to upgrading, and is now > >24hr on Spark3. It's a lot simpler, so this is more-or-less the SQL it's > doing (with a number of other internal group-by fields removed for brievity). > {code:sql} > with t as ( > select > method, > response_code, > if(response_code >= 200 and response_code < 300, duration_ms, null) as > duration_ms_2xx, > from logs > ) > select > method, > count(*) as cnt, > sum(case when response_code >= 200 and response_code < 300 then 1 else 0 > end) as cnt_2xx, > sum(case when response_code >= 300 and response_code < 400 then 1 else 0 > end) as cnt_3xx, > sum(case when response_code >= 400 and response_code < 500 then 1 else 0 > end) as cnt_4xx, > sum(case when response_code >= 500 then 1 else 0 end) as cnt_5xx, > approx_percentile(duration_ms_2xx, 0.5) as latency_p50, > approx_percentile(duration_ms_2xx, 0.9) as latency_p90, > approx_percentile(duration_ms_2xx, 0.99) as latency_p99, > approx_percentile(duration_ms_2xx, 0.999) as latency_p999, > from t > group by 1 > {code} > This one has a conditional input into {{approx_percentile}}, so it is > possible the change related to that is still a factor? -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org