[ https://issues.apache.org/jira/browse/SPARK-22223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199831#comment-16199831 ]
Michele Costantino Soccio commented on SPARK-22223: --------------------------------------------------- [~maropu] Not sure I understand your observation. In the hash-based aggregate implementation Spark does a repartitioning and a shuffle at each {{groupBy}}. In the sort-based aggregate one, Spark does no repartition and no shuffle. I would like to ignore whether Spark prefers sort-based or hash-based aggregation, but I do not want Spark to shuffle and repartition when there is no need for it. > ObjectHashAggregate introduces unnecessary shuffle > -------------------------------------------------- > > Key: SPARK-22223 > URL: https://issues.apache.org/jira/browse/SPARK-22223 > Project: Spark > Issue Type: Bug > Components: Optimizer > Affects Versions: 2.2.0 > Environment: Spark 2.2.0 and following. > {{spark.sql.execution.useObjectHashAggregateExec = true}} > Reporter: Michele Costantino Soccio > > Since Spark 2.2 the {{groupBy}} plus {{collect_list}} makes use of > unnecessary shuffle when the partitions at previous step are based on looser > criteria than the current {{groupBy}}. > For example: > {code:java} > //sample data from > https://github.com/databricks/Spark-The-Definitive-Guide/tree/master/data/retail-data > //Read the data and repartitions by "Country" > val retailDF = spark.sql("Select * from online_retail") > .repartition(col("Country")) > //Group the data and collect. > val aggregatedDF = retailDF > .withColumn("Good", expr("(StockCode, UnitPrice, Quantity, Description)")) > .groupBy("Country", "CustomerID", "InvoiceNo", "InvoiceDate") > .agg(collect_list("Good").as("Goods")) > .withColumn("Invoice", expr("(InvoiceNo, InvoiceDate, Goods)")) > .groupBy("Country", "CustomerID") > .agg(collect_list("Invoice").as("Invoices")) > .withColumn("Customer", expr("(CustomerID, Invoices)")) > .groupBy("Country") > .agg(collect_list("Customer").as("Customers")) > {code} > Without disabling the {{ObjectHashAggregate}} one gets the following physical > plan: > {noformat} > == Physical Plan == > ObjectHashAggregate(keys=[Country#14], > functions=[finalmerge_collect_list(merge buf#317) AS > collect_list(Customer#299, 0, 0)#310]) > +- Exchange hashpartitioning(Country#14, 200) > +- ObjectHashAggregate(keys=[Country#14], > functions=[partial_collect_list(Customer#299, 0, 0) AS buf#317]) > +- *Project [Country#14, named_struct(CustomerID, CustomerID#13, > Invoices, Invoices#294) AS Customer#299] > +- ObjectHashAggregate(keys=[Country#14, CustomerID#13], > functions=[finalmerge_collect_list(merge buf#319) AS > collect_list(Invoice#278, 0, 0)#293]) > +- Exchange hashpartitioning(Country#14, CustomerID#13, 200) > +- ObjectHashAggregate(keys=[Country#14, CustomerID#13], > functions=[partial_collect_list(Invoice#278, 0, 0) AS buf#319]) > +- *Project [Country#14, CustomerID#13, > named_struct(InvoiceNo, InvoiceNo#7, InvoiceDate, InvoiceDate#11, Goods, > Goods#271) AS Invoice#278] > +- ObjectHashAggregate(keys=[Country#14, CustomerID#13, > InvoiceNo#7, InvoiceDate#11], functions=[finalmerge_collect_list(merge > buf#321) AS collect_list(Good#249, 0, 0)#270]) > +- Exchange hashpartitioning(Country#14, > CustomerID#13, InvoiceNo#7, InvoiceDate#11, 200) > +- ObjectHashAggregate(keys=[Country#14, > CustomerID#13, InvoiceNo#7, InvoiceDate#11], > functions=[partial_collect_list(Good#249, 0, 0) AS buf#321]) > +- *Project [InvoiceNo#7, InvoiceDate#11, > CustomerID#13, Country#14, named_struct(StockCode, StockCode#8, UnitPrice, > UnitPrice#12, Quantity, Quantity#10, Description, Description#9) AS Good#249] > +- Exchange hashpartitioning(Country#14, 200) > +- *FileScan csv > default.online_retail[InvoiceNo#7,StockCode#8,Description#9,Quantity#10,InvoiceDate#11,UnitPrice#12,CustomerID#13,Country#14] > Batched: false, Format: CSV, Location: > InMemoryFileIndex[dbfs:/FileStore/tables/scgc0grb1506404260438], > PartitionFilters: [], PushedFilters: [], ReadSchema: > struct<InvoiceNo:string,StockCode:string,Description:string,Quantity:string,InvoiceDate:string,Un... > {noformat} > With Spark 2.1.0 or when {{ObjectHashAggregate}} is disabled, one gets a more > efficient: > {noformat} > == Physical Plan == > SortAggregate(key=[Country#14], functions=[finalmerge_collect_list(merge > buf#3834) AS collect_list(Customer#299, 0, 0)#310]) > +- SortAggregate(key=[Country#14], > functions=[partial_collect_list(Customer#299, 0, 0) AS buf#3834]) > +- *Project [Country#14, named_struct(CustomerID, CustomerID#13, Invoices, > Invoices#294) AS Customer#299] > +- SortAggregate(key=[Country#14, CustomerID#13], > functions=[finalmerge_collect_list(merge buf#319) AS > collect_list(Invoice#278, 0, 0)#293]) > +- SortAggregate(key=[Country#14, CustomerID#13], > functions=[partial_collect_list(Invoice#278, 0, 0) AS buf#319]) > +- *Project [Country#14, CustomerID#13, named_struct(InvoiceNo, > InvoiceNo#7, InvoiceDate, InvoiceDate#11, Goods, Goods#271) AS Invoice#278] > +- SortAggregate(key=[Country#14, CustomerID#13, InvoiceNo#7, > InvoiceDate#11], functions=[finalmerge_collect_list(merge buf#321) AS > collect_list(Good#249, 0, 0)#270]) > +- SortAggregate(key=[Country#14, CustomerID#13, > InvoiceNo#7, InvoiceDate#11], functions=[partial_collect_list(Good#249, 0, 0) > AS buf#321]) > +- *Sort [Country#14 ASC NULLS FIRST, CustomerID#13 ASC > NULLS FIRST, InvoiceNo#7 ASC NULLS FIRST, InvoiceDate#11 ASC NULLS FIRST], > false, 0 > +- *Project [InvoiceNo#7, InvoiceDate#11, > CustomerID#13, Country#14, named_struct(StockCode, StockCode#8, UnitPrice, > UnitPrice#12, Quantity, Quantity#10, Description, Description#9) AS Good#249] > +- Exchange hashpartitioning(Country#14, 200) > +- *FileScan csv > default.online_retail[InvoiceNo#7,StockCode#8,Description#9,Quantity#10,InvoiceDate#11,UnitPrice#12,CustomerID#13,Country#14] > Batched: false, Format: CSV, Location: > InMemoryFileIndex[dbfs:/FileStore/tables/scgc0grb1506404260438], > PartitionFilters: [], PushedFilters: [], ReadSchema: > struct<InvoiceNo:string,StockCode:string,Description:string,Quantity:string,InvoiceDate:string,Un... > {noformat} > In this example, a quick run on DataBricks Notebook showed that by manually > disabling the {{ObjectHashAggregate}} one gets around 16s execution time > versus the 25s needed when {{ObjectHashAggregate}} is enabled. > The use of the {{ObjectHashAggregate}} in the {{groupBy}} was introduced with > SPARK-17949. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org