GitHub user viirya opened a pull request:

    https://github.com/apache/spark/pull/17175

    [SPARK-19468][SQL][WIP] Rewrite physical Project operator's output 
partitioning and ordering to ensure no unnecessary shuffle/sort in Datasets

    ## What changes were proposed in this pull request?
    
    Before run the following example, please set 
`spark.sql.autoBroadcastJoinThreshold` to `-1` to force Spark SQL use 
`SortMergeJoin`.
    
    When we join two `Dataset`s as following, the already sorted and 
partitioned `Dataset`s are planned for another `Sort` and `ShuffleExchange`.
    
        val ds1 = Seq((0, 0), (1, 1)).toDS
          
.repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
        val ds2 = Seq((0, 0), (1, 1)).toDS
          
.repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
    
        val joined1 = ds1.joinWith(ds2, ds1("_1") === ds2("_1"))
        joined1.explain
    
        == Physical Plan ==
        *SortMergeJoin [_1#105._1], [_2#106._1], Inner
        :- *Sort [_1#105._1 ASC NULLS FIRST], false, 0
        :  +- Exchange hashpartitioning(_1#105._1, 4)
        :     +- *Project [named_struct(_1, _1#83, _2, _2#84) AS _1#105]
        :        +- InMemoryTableScan [_1#83, _2#84]
        :              +- InMemoryRelation [_1#83, _2#84], true, 10000, 
StorageLevel(disk, 1 replicas)
        :                    +- *Sort [_1#83 ASC NULLS FIRST], false, 0
        :                       +- Exchange hashpartitioning(_1#83, 4)
        :                          +- LocalTableScan [_1#83, _2#84]
        +- *Sort [_2#106._1 ASC NULLS FIRST], false, 0
           +- Exchange hashpartitioning(_2#106._1, 4)
              +- *Project [named_struct(_1, _1#100, _2, _2#101) AS _2#106]
                 +- InMemoryTableScan [_1#100, _2#101]
                       +- InMemoryRelation [_1#100, _2#101], true, 10000, 
StorageLevel(disk, 1 replicas)
                             +- *Sort [_1#83 ASC NULLS FIRST], false, 0
                                +- Exchange hashpartitioning(_1#83, 4)
                                   +- LocalTableScan [_1#83, _2#84]
    
    Two issues here.
    
    1. `InMemoryTableScan` doesn't have correct `outputPartitioning` and 
`outputOrdering`.  `InMemoryTableScan` will have different output than its 
relation's child plan. So it needs to rewrite the `outputPartitioning` and 
`outputOrdering` from its relation's child plan with correct output attributes.
    
    2. If a `Project` operator projects some of its input to a complex type 
(e.g., `named_struct(_1, _1#83, _2, _2#84) AS _1#105` in above). Its parent 
operator will use this attribute (e.g., `_1#105._1` in above) of complex type 
as `requiredChildDistribution` and `requiredChildOrdering`. Because `Project` 
doesn't change the `outputPartitioning` and `outputOrdering` of its child plan, 
so you will always insert extra shuffle and sort on top on the `Project`.
    
    
    ## How was this patch tested?
    
    Jenkins tests.
    
    Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/viirya/spark-1 ensure-no-unnecessary-shuffle

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/17175.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #17175
    
----
commit 6e4eba6c9c637f9af11dae17a4ee2f1b39ee00be
Author: Liang-Chi Hsieh <vii...@gmail.com>
Date:   2017-03-06T08:56:38Z

    Rewrite physical Project operator's output partitioning and ordering to 
ensure no unnecessary shuffle/sort in Datasets.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to