[ 
https://issues.apache.org/jira/browse/SPARK-19122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15808485#comment-15808485
 ] 

Tejas Patil commented on SPARK-19122:
-------------------------------------

[~hvanhovell] : In a broader level, I see that when nodes for physical 
operators are created, the `requiredChildDistribution` and 
`requiredChildOrdering` is pretty much fixed at that point. While the planning 
is done and the child nodes are inspected, there is no way to change it.

For fixing this, my take would be :
* Allow the `requiredChildDistribution` and `requiredChildOrdering` to decided 
based on its children. ie. change `requiredChildOrdering()` to 
`requiredChildOrdering(childOrderings)`. Default behavior would be to ignore 
the input `childOrderings`. 
* In case of operators like `SortMergeJoinExec`, where the distribution and 
ordering requirement is way stricter, the operator could itself take a decision 
what ordering needs to be used.

||child output ordering||ordering of join keys in query||shuffle+sort needed||
| a, b | a, b | No |
| a, b | b, a | No |
| a, b, c, d | a, b | No |
| a, b, c, d | b, c | Yes |
| a, b | a, b, c, d | Yes |
| b, c | a, b, c, d | Yes |

Even SPARK-17271 will benefit from this change. Let me know if you have any 
opinion about this approach OR have something better in mind.


> Unnecessary shuffle+sort added if join predicates ordering differ from 
> bucketing and sorting order
> --------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-19122
>                 URL: https://issues.apache.org/jira/browse/SPARK-19122
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.2, 2.1.0
>            Reporter: Tejas Patil
>
> `table1` and `table2` are sorted and bucketed on columns `j` and `k` (in 
> respective order)
> This is how they are generated:
> {code}
> val df = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j", 
> "k").coalesce(1)
> df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, 
> "j", "k").sortBy("j", "k").saveAsTable("table1")
> df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, 
> "j", "k").sortBy("j", "k").saveAsTable("table2")
> {code}
> Now, if join predicates are specified in query in *same* order as bucketing 
> and sort order, there is no shuffle and sort.
> {code}
> scala> hc.sql("SELECT * FROM table1 a JOIN table2 b ON a.j=b.j AND 
> a.k=b.k").explain(true)
> == Physical Plan ==
> *SortMergeJoin [j#61, k#62], [j#100, k#101], Inner
> :- *Project [i#60, j#61, k#62]
> :  +- *Filter (isnotnull(k#62) && isnotnull(j#61))
> :     +- *FileScan orc default.table1[i#60,j#61,k#62] Batched: false, Format: 
> ORC, Location: 
> InMemoryFileIndex[file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table1],
>  PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], 
> ReadSchema: struct<i:int,j:int,k:string>
> +- *Project [i#99, j#100, k#101]
>    +- *Filter (isnotnull(j#100) && isnotnull(k#101))
>       +- *FileScan orc default.table2[i#99,j#100,k#101] Batched: false, 
> Format: ORC, Location: 
> InMemoryFileIndex[file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table2],
>  PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], 
> ReadSchema: struct<i:int,j:int,k:string>
> {code}
> The same query with join predicates in *different* order from bucketing and 
> sort order leads to extra shuffle and sort being introduced
> {code}
> scala> hc.sql("SELECT * FROM table1 a JOIN table2 b ON a.k=b.k AND a.j=b.j 
> ").explain(true)
> == Physical Plan ==
> *SortMergeJoin [k#62, j#61], [k#101, j#100], Inner
> :- *Sort [k#62 ASC NULLS FIRST, j#61 ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(k#62, j#61, 200)
> :     +- *Project [i#60, j#61, k#62]
> :        +- *Filter (isnotnull(k#62) && isnotnull(j#61))
> :           +- *FileScan orc default.table1[i#60,j#61,k#62] Batched: false, 
> Format: ORC, Location: InMemoryFileIndex[file:/spark-warehouse/table1], 
> PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], 
> ReadSchema: struct<i:int,j:int,k:string>
> +- *Sort [k#101 ASC NULLS FIRST, j#100 ASC NULLS FIRST], false, 0
>    +- Exchange hashpartitioning(k#101, j#100, 200)
>       +- *Project [i#99, j#100, k#101]
>          +- *Filter (isnotnull(j#100) && isnotnull(k#101))
>             +- *FileScan orc default.table2[i#99,j#100,k#101] Batched: false, 
> Format: ORC, Location: InMemoryFileIndex[file:/spark-warehouse/table2], 
> PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], 
> ReadSchema: struct<i:int,j:int,k:string>
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to