Emanuel Velzi created SPARK-44156: ------------------------------------- Summary: Should HashAggregation improve dropDuplicates()? Key: SPARK-44156 URL: https://issues.apache.org/jira/browse/SPARK-44156 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.3.2 Reporter: Emanuel Velzi
TL;DR: SortAggregate makes dropDuplicates slower than HashAggregate. How to make Spark to use HashAggregate over SortAggregate? ---------------------- We have a Spark cluster running on Kubernetes with the following configurations: * Spark v3.3.2 * Hadoop 3.3.4 * Java 17 We are running a simple job on a dataset (~6GBi) with almost 600 columns, many of which contain null values. The job involves the following steps: # Load data from S3. # Apply dropDuplicates(). # Save the deduplicated data back to S3 using magic committers. One of the columns is of type "map". When we run dropDuplicates() without specifying any parameters (i.e., using all columns), it throws an error: {noformat} Exception in thread "main" org.apache.spark.sql.AnalysisException: Cannot have map type columns in DataFrame which calls set operations(intersect, except, etc.), but the type of column my_column is map<string,array<struct<ci:string,co:string,cur:string,Getaways_Zone:string,Getaways_ID:string>>>;{noformat} To overcome this issue, we used "dropDuplicates(id)" by specifying an identifier column. However, the performance of this method was {*}much worse than expected{*}, taking around 30 minutes. As an alternative approach, we tested converting the "map" column to JSON, applying dropDuplicates() without parameters, and then converting the column back to "map" format: {code:java} DataType t = ds.schema().apply("my_column").dataType(); ds = ds.withColumn("my_column", functions.to_json(ds.col("my_column"))); ds = ds.dropDuplicates(); ds = ds.withColumn("my_column", functions.from_json(ds.col("my_column"),t)); {code} Surprisingly, this approach {*}significantly improved the performance{*}, reducing the execution time to 7 minutes. The only noticeable difference was in the execution plan. In the *slower* case, the execution plan involved {*}SortAggregate{*}, while in the *faster* case, it involved {*}HashAggregate{*}. {noformat} == Physical Plan [slow case] == Execute InsertIntoHadoopFsRelationCommand (13) +- AdaptiveSparkPlan (12) +- == Final Plan == Coalesce (8) +- SortAggregate (7) +- Sort (6) +- ShuffleQueryStage (5), Statistics(sizeInBytes=141.3 GiB, rowCount=1.25E+7) +- Exchange (4) +- SortAggregate (3) +- Sort (2) +- Scan parquet (1) +- == Initial Plan == Coalesce (11) +- SortAggregate (10) +- Sort (9) +- Exchange (4) +- SortAggregate (3) +- Sort (2) +- Scan parquet (1) {noformat} {noformat} == Physical Plan [fast case] == Execute InsertIntoHadoopFsRelationCommand (11) +- AdaptiveSparkPlan (10) +- == Final Plan == Coalesce (7) +- HashAggregate (6) +- ShuffleQueryStage (5), Statistics(sizeInBytes=81.6 GiB, rowCount=1.25E+7) +- Exchange (4) +- HashAggregate (3) +- Project (2) +- Scan parquet (1) +- == Initial Plan == Coalesce (9) +- HashAggregate (8) +- Exchange (4) +- HashAggregate (3) +- Project (2) +- Scan parquet (1) {noformat} Based on this observation, we concluded that the difference in performance is related to {*}SortAggregate versus HashAggregate{*}. Is this line of thinking correct? How we can to enforce the use of HashAggregate instead of SortAggregate? *The final result is somewhat counterintuitive* because deduplicating using only one column should theoretically be faster, as it provides a simpler way to compare rows and determine duplicates. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org