Emanuel Velzi created SPARK-44156:
-------------------------------------

             Summary: Should HashAggregation improve dropDuplicates()?
                 Key: SPARK-44156
                 URL: https://issues.apache.org/jira/browse/SPARK-44156
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.3.2
            Reporter: Emanuel Velzi


TL;DR: SortAggregate makes dropDuplicates slower than HashAggregate. 

How to make Spark to use HashAggregate over SortAggregate? 

----------------------

We have a Spark cluster running on Kubernetes with the following configurations:
 * Spark v3.3.2
 * Hadoop 3.3.4
 * Java 17

We are running a simple job on a dataset (~6GBi) with almost 600 columns, many 
of which contain null values. The job involves the following steps:
 # Load data from S3.
 # Apply dropDuplicates().
 # Save the deduplicated data back to S3 using magic committers.

One of the columns is of type "map". When we run dropDuplicates() without 
specifying any parameters (i.e., using all columns), it throws an error:

 
{noformat}
Exception in thread "main" org.apache.spark.sql.AnalysisException: Cannot have 
map type columns in DataFrame which calls set operations(intersect, except, 
etc.), but the type of column my_column is 
map<string,array<struct<ci:string,co:string,cur:string,Getaways_Zone:string,Getaways_ID:string>>>;{noformat}
 

To overcome this issue, we used "dropDuplicates(id)" by specifying an 
identifier column.

However, the performance of this method was {*}much worse than expected{*}, 
taking around 30 minutes.

As an alternative approach, we tested converting the "map" column to JSON, 
applying dropDuplicates() without parameters, and then converting the column 
back to "map" format:

 
{code:java}
DataType t = ds.schema().apply("my_column").dataType();
ds = ds.withColumn("my_column", functions.to_json(ds.col("my_column")));
ds = ds.dropDuplicates();
ds = ds.withColumn("my_column", functions.from_json(ds.col("my_column"),t)); 
{code}
 

Surprisingly, this approach {*}significantly improved the performance{*}, 
reducing the execution time to 7 minutes.

The only noticeable difference was in the execution plan. In the *slower* case, 
the execution plan involved {*}SortAggregate{*}, while in the *faster* case, it 
involved {*}HashAggregate{*}.

 
{noformat}
== Physical Plan [slow case] == 
Execute InsertIntoHadoopFsRelationCommand (13)
+- AdaptiveSparkPlan (12)
   +- == Final Plan ==
      Coalesce (8)
      +- SortAggregate (7)
         +- Sort (6)
            +- ShuffleQueryStage (5), Statistics(sizeInBytes=141.3 GiB, 
rowCount=1.25E+7)
               +- Exchange (4)
                  +- SortAggregate (3)
                     +- Sort (2)
                        +- Scan parquet  (1)
   +- == Initial Plan ==
      Coalesce (11)
      +- SortAggregate (10)
         +- Sort (9)
            +- Exchange (4)
               +- SortAggregate (3)
                  +- Sort (2)
                     +- Scan parquet  (1)
{noformat}
 

 

 
{noformat}
== Physical Plan [fast case] ==
Execute InsertIntoHadoopFsRelationCommand (11)
+- AdaptiveSparkPlan (10)
   +- == Final Plan ==
      Coalesce (7)
      +- HashAggregate (6)
         +- ShuffleQueryStage (5), Statistics(sizeInBytes=81.6 GiB, 
rowCount=1.25E+7)
            +- Exchange (4)
               +- HashAggregate (3)
                  +- Project (2)
                     +- Scan parquet  (1)
   +- == Initial Plan ==
      Coalesce (9)
      +- HashAggregate (8)
         +- Exchange (4)
            +- HashAggregate (3)
               +- Project (2)
                  +- Scan parquet  (1)
{noformat}
 

 

Based on this observation, we concluded that the difference in performance is 
related to {*}SortAggregate versus HashAggregate{*}.

Is this line of thinking correct? How we can to enforce the use of 
HashAggregate instead of SortAggregate?

*The final result is somewhat counterintuitive* because deduplicating using 
only one column should theoretically be faster, as it provides a simpler way to 
compare rows and determine duplicates.





 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to