adriangb opened a new issue, #17625: URL: https://github.com/apache/datafusion/issues/17625
Currently these operators compute the hashes of the keys at least twice, once in `RepartitionExec` and once in `HashJoinExec`: ``` CREATE EXTERNAL TABLE customer STORED AS PARQUET LOCATION 'benchmarks/data/tpch_sf10/customer/'; CREATE EXTERNAL TABLE orders STORED AS PARQUET LOCATION 'benchmarks/data/tpch_sf10/orders/'; EXPLAIN SELECT * FROM customer JOIN orders on c_custkey = o_custkey WHERE c_phone = '25-989-741-2988'; -- +---------------+------------------------------------------------------------+ -- | plan_type | plan | -- +---------------+------------------------------------------------------------+ -- | physical_plan | ┌───────────────────────────┐ | -- | | │ CoalesceBatchesExec │ | -- | | │ -------------------- │ | -- | | │ target_batch_size: │ | -- | | │ 8192 │ | -- | | └─────────────┬─────────────┘ | -- | | ┌─────────────┴─────────────┐ | -- | | │ HashJoinExec │ | -- | | │ -------------------- │ | -- | | │ on: ├──────────────┐ | -- | | │ (c_custkey = o_custkey) │ │ | -- | | └─────────────┬─────────────┘ │ | -- | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | -- | | │ CoalesceBatchesExec ││ CoalesceBatchesExec │ | -- | | │ -------------------- ││ -------------------- │ | -- | | │ target_batch_size: ││ target_batch_size: │ | -- | | │ 8192 ││ 8192 │ | -- | | └─────────────┬─────────────┘└─────────────┬─────────────┘ | -- | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | -- | | │ RepartitionExec ││ RepartitionExec │ | -- | | │ -------------------- ││ -------------------- │ | -- | | │ partition_count(in->out): ││ partition_count(in->out): │ | -- | | │ 12 -> 12 ││ 12 -> 12 │ | -- | | │ ││ │ | -- | | │ partitioning_scheme: ││ partitioning_scheme: │ | -- | | │ Hash([c_custkey@0], 12) ││ Hash([o_custkey@1], 12) │ | -- | | └─────────────┬─────────────┘└─────────────┬─────────────┘ | -- | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | -- | | │ CoalesceBatchesExec ││ DataSourceExec │ | -- | | │ -------------------- ││ -------------------- │ | -- | | │ target_batch_size: ││ files: 23 │ | -- | | │ 8192 ││ format: parquet │ | -- | | │ ││ predicate: true │ | -- | | └─────────────┬─────────────┘└───────────────────────────┘ | -- | | ┌─────────────┴─────────────┐ | -- | | │ FilterExec │ | -- | | │ -------------------- │ | -- | | │ predicate: │ | -- | | │ c_phone = 25-989-741-2988 │ | -- | | └─────────────┬─────────────┘ | -- | | ┌─────────────┴─────────────┐ | -- | | │ DataSourceExec │ | -- | | │ -------------------- │ | -- | | │ files: 23 │ | -- | | │ format: parquet │ | -- | | │ │ | -- | | │ predicate: │ | -- | | │ c_phone = 25-989-741-2988 │ | -- | | └───────────────────────────┘ | -- | | | -- +---------------+------------------------------------------------------------+ -- 1 row(s) fetched. -- Elapsed 0.019 seconds. ``` The proposal was to edit the plan to be something like: ``` | | ┌─────────────┴─────────────┐ | | | │ HashJoinExec │ | | | │ -------------------- │ | | | │ on: ├──────────────┐ | | | │ (c_custkey = o_custkey) │ │ | | | └─────────────┬─────────────┘ │ | | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | | | │ CoalesceBatchesExec ││ CoalesceBatchesExec │ | | | │ -------------------- ││ -------------------- │ | | | │ target_batch_size: ││ target_batch_size: │ | | | │ 8192 ││ 8192 │ | | | └─────────────┬─────────────┘└─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | | | │ RepartitionExec ││ RepartitionExec │ | | | │ -------------------- ││ -------------------- │ | | | │ partition_count(in->out): ││ partition_count(in->out): │ | | | │ 12 -> 12 ││ 12 -> 12 │ | | | │ ││ │ | | | │ partitioning_scheme: ││ partitioning_scheme: │ | | | │ List(o_custkey, 12) ││ List(c_custkey, 12) │ | | | └─────────────┬─────────────┘└─────────────┬─────────────┘ | | | ┌───────────────────────────┐┌───────────────────────────┐ | | | │ ProjectionExec ││ ProjectionExec │ | | | │ -------------------- ││ -------------------- │ | | | │ expr: ││ expr: │ | | | │ *, hash_part(o_custkey) ││ *, hash_part(o_custkey) │ | | | └─────────────┬─────────────┘└─────────────┬─────────────┘ | ``` The point is that an optimizer rule injects an extra column w/ the partition calculation as a projection. The RepartitionExec then uses this column for repartitioning and the HashJoinExec uses it to look up the build side hash table to use, and removes it / drops it before emitting batches to its parent node. Since we push projections down we may even be able to push this projection down all the way into the scan, which ties in with #17599. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
