adriangb opened a new issue, #17625:
URL: https://github.com/apache/datafusion/issues/17625

   Currently these operators compute the hashes of the keys at least twice, 
once in `RepartitionExec` and once in `HashJoinExec`:
   
   ```
   CREATE EXTERNAL TABLE customer STORED AS PARQUET LOCATION 
'benchmarks/data/tpch_sf10/customer/';
   CREATE EXTERNAL TABLE orders STORED AS PARQUET LOCATION 
'benchmarks/data/tpch_sf10/orders/';
   
   EXPLAIN
   SELECT *
   FROM customer
   JOIN orders on c_custkey = o_custkey
   WHERE c_phone = '25-989-741-2988';
   
   -- 
+---------------+------------------------------------------------------------+
   -- | plan_type     | plan                                                    
   |
   -- 
+---------------+------------------------------------------------------------+
   -- | physical_plan | ┌───────────────────────────┐                           
   |
   -- |               | │    CoalesceBatchesExec    │                           
   |
   -- |               | │    --------------------   │                           
   |
   -- |               | │     target_batch_size:    │                           
   |
   -- |               | │            8192           │                           
   |
   -- |               | └─────────────┬─────────────┘                           
   |
   -- |               | ┌─────────────┴─────────────┐                           
   |
   -- |               | │        HashJoinExec       │                           
   |
   -- |               | │    --------------------   │                           
   |
   -- |               | │            on:            ├──────────────┐            
   |
   -- |               | │  (c_custkey = o_custkey)  │              │            
   |
   -- |               | └─────────────┬─────────────┘              │            
   |
   -- |               | 
┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
   -- |               | │    CoalesceBatchesExec    ││    CoalesceBatchesExec   
 │ |
   -- |               | │    --------------------   ││    --------------------  
 │ |
   -- |               | │     target_batch_size:    ││     target_batch_size:   
 │ |
   -- |               | │            8192           ││            8192          
 │ |
   -- |               | 
└─────────────┬─────────────┘└─────────────┬─────────────┘ |
   -- |               | 
┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
   -- |               | │      RepartitionExec      ││      RepartitionExec     
 │ |
   -- |               | │    --------------------   ││    --------------------  
 │ |
   -- |               | │ partition_count(in->out): ││ 
partition_count(in->out): │ |
   -- |               | │          12 -> 12         ││          12 -> 12        
 │ |
   -- |               | │                           ││                          
 │ |
   -- |               | │    partitioning_scheme:   ││    partitioning_scheme:  
 │ |
   -- |               | │  Hash([c_custkey@0], 12)  ││  Hash([o_custkey@1], 12) 
 │ |
   -- |               | 
└─────────────┬─────────────┘└─────────────┬─────────────┘ |
   -- |               | 
┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
   -- |               | │    CoalesceBatchesExec    ││       DataSourceExec     
 │ |
   -- |               | │    --------------------   ││    --------------------  
 │ |
   -- |               | │     target_batch_size:    ││         files: 23        
 │ |
   -- |               | │            8192           ││      format: parquet     
 │ |
   -- |               | │                           ││      predicate: true     
 │ |
   -- |               | 
└─────────────┬─────────────┘└───────────────────────────┘ |
   -- |               | ┌─────────────┴─────────────┐                           
   |
   -- |               | │         FilterExec        │                           
   |
   -- |               | │    --------------------   │                           
   |
   -- |               | │         predicate:        │                           
   |
   -- |               | │ c_phone = 25-989-741-2988 │                           
   |
   -- |               | └─────────────┬─────────────┘                           
   |
   -- |               | ┌─────────────┴─────────────┐                           
   |
   -- |               | │       DataSourceExec      │                           
   |
   -- |               | │    --------------------   │                           
   |
   -- |               | │         files: 23         │                           
   |
   -- |               | │      format: parquet      │                           
   |
   -- |               | │                           │                           
   |
   -- |               | │         predicate:        │                           
   |
   -- |               | │ c_phone = 25-989-741-2988 │                           
   |
   -- |               | └───────────────────────────┘                           
   |
   -- |               |                                                         
   |
   -- 
+---------------+------------------------------------------------------------+
   -- 1 row(s) fetched. 
   -- Elapsed 0.019 seconds.
   ```
   
   The proposal was to edit the plan to be something like:
   
   ```
   |               | ┌─────────────┴─────────────┐                              
|
   |               | │        HashJoinExec       │                              
|
   |               | │    --------------------   │                              
|
   |               | │            on:            ├──────────────┐               
|
   |               | │  (c_custkey = o_custkey)  │              │               
|
   |               | └─────────────┬─────────────┘              │               
|
   |               | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ 
|
   |               | │    CoalesceBatchesExec    ││    CoalesceBatchesExec    │ 
|
   |               | │    --------------------   ││    --------------------   │ 
|
   |               | │     target_batch_size:    ││     target_batch_size:    │ 
|
   |               | │            8192           ││            8192           │ 
|
   |               | └─────────────┬─────────────┘└─────────────┬─────────────┘ 
|
   |               | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ 
|
   |               | │      RepartitionExec      ││      RepartitionExec      │ 
|
   |               | │    --------------------   ││    --------------------   │ 
|
   |               | │ partition_count(in->out): ││ partition_count(in->out): │ 
|
   |               | │          12 -> 12         ││          12 -> 12         │ 
|
   |               | │                           ││                           │ 
|
   |               | │    partitioning_scheme:   ││    partitioning_scheme:   │ 
|
   |               | │  List(o_custkey, 12)      ││  List(c_custkey, 12)      │ 
|
   |               | └─────────────┬─────────────┘└─────────────┬─────────────┘ 
|
   |               | ┌───────────────────────────┐┌───────────────────────────┐ 
|
   |               | │       ProjectionExec      ││       ProjectionExec      │ 
|
   |               | │    --------------------   ││    --------------------   │ 
|
   |               | │       expr:               ││       expr:               │ 
|
   |               | │  *, hash_part(o_custkey)  ││  *, hash_part(o_custkey)  │ 
|
   |               | └─────────────┬─────────────┘└─────────────┬─────────────┘ 
|
   ```
   
   The point is that an optimizer rule injects an extra column w/ the partition 
calculation as a projection. The RepartitionExec then uses this column for 
repartitioning and the HashJoinExec uses it to look up the build side hash 
table to use, and removes it / drops it before emitting batches to its parent 
node.
   
   Since we push projections down we may even be able to push this projection 
down all the way into the scan, which ties in with #17599.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to