[ 
https://issues.apache.org/jira/browse/SPARK-24025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16445512#comment-16445512
 ] 

Jacek Laskowski commented on SPARK-24025:
-----------------------------------------

I was about to have closed this as a duplicate, but I'm not so sure anymore. 
Reading the following in SPARK-17570:
{quote}If the number of buckets in the output table is a factor of the buckets 
in the input table, we should be able to avoid `Sort` and `Exchange` and 
directly join those.
{quote}
I'm no longer so sure that the two issues are perfectly identical, but they're 
close "relatives". I think the next step would be to write a test case to 
reproduce this issue and the fairly simple fix would be to have an physical 
optimization (rule) that would eliminate one extra Exchange and Sort ops. Who'd 
help me here?

> Join of bucketed and non-bucketed tables can give two exchanges and sorts for 
> non-bucketed side
> -----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-24025
>                 URL: https://issues.apache.org/jira/browse/SPARK-24025
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.0, 2.3.1
>         Environment: {code:java}
> ./bin/spark-shell --version
> Welcome to
> ____ __
> / __/__ ___ _____/ /__
> _\ \/ _ \/ _ `/ __/ '_/
> /___/ .__/\_,_/_/ /_/\_\ version 2.3.0
> /_/
> Using Scala version 2.11.8, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_162
> Branch master
> Compiled by user sameera on 2018-02-22T19:24:29Z
> Revision a0d7949896e70f427e7f3942ff340c9484ff0aab
> Url g...@github.com:sameeragarwal/spark.git
> Type --help for more information.{code}
>            Reporter: Jacek Laskowski
>            Priority: Major
>         Attachments: join-jira.png
>
>
> While exploring bucketing I found the following join query of non-bucketed 
> and bucketed tables that ends up with two exchanges and two sorts in the 
> physical plan for the non-bucketed join side.
> {code}
> // Make sure that you don't end up with a BroadcastHashJoin and a 
> BroadcastExchange
> // Disable auto broadcasting
> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
> val bucketedTableName = "bucketed_4_id"
> val large = spark.range(1000000)
> large.write
>   .bucketBy(4, "id")
>   .sortBy("id")
>   .mode("overwrite")
>   .saveAsTable(bucketedTableName)
> // Describe the table and include bucketing spec only
> val descSQL = sql(s"DESC FORMATTED 
> $bucketedTableName").filter($"col_name".contains("Bucket") || $"col_name" === 
> "Sort Columns")
> scala> descSQL.show(truncate = false)
> +--------------+---------+-------+
> |col_name      |data_type|comment|
> +--------------+---------+-------+
> |Num Buckets   |4        |       |
> |Bucket Columns|[`id`]   |       |
> |Sort Columns  |[`id`]   |       |
> +--------------+---------+-------+
> val bucketedTable = spark.table(bucketedTableName)
> val t1 = spark.range(4)
>   .repartition(2, $"id")  // Use just 2 partitions
>   .sortWithinPartitions("id") // sort partitions
> val q = t1.join(bucketedTable, "id")
> // Note two exchanges and sorts
> scala> q.explain
> == Physical Plan ==
> *(5) Project [id#79L]
> +- *(5) SortMergeJoin [id#79L], [id#77L], Inner
>    :- *(3) Sort [id#79L ASC NULLS FIRST], false, 0
>    :  +- Exchange hashpartitioning(id#79L, 4)
>    :     +- *(2) Sort [id#79L ASC NULLS FIRST], false, 0
>    :        +- Exchange hashpartitioning(id#79L, 2)
>    :           +- *(1) Range (0, 4, step=1, splits=8)
>    +- *(4) Sort [id#77L ASC NULLS FIRST], false, 0
>       +- *(4) Project [id#77L]
>          +- *(4) Filter isnotnull(id#77L)
>             +- *(4) FileScan parquet default.bucketed_4_id[id#77L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/bucketed_4_id],
>  PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: 
> struct<id:bigint>
> q.foreach(_ => ())
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to