2010YOUY01 commented on issue #18341: URL: https://github.com/apache/datafusion/issues/18341#issuecomment-3466350575
I think it might be related to the issue, there is a discord discussion for slow tpch q1 https://discord.com/channels/885562378132000778/1290751484807352412/1432863136612089959 From the plan, there is only one hash repartition operator, and no round-robin repartition. I believe they're necessary if the data source is not very small (and we have to push them further down as explained in this issue, otherwise they're useless) The reason is the push downed filter ` l_shipdate <= date '1998-09-02'` may filter out most of the input parquet scanner partition. (Note below is just a imaginary case, I haven't verified that's the actual reason for slow Q1, but I think it's possible, so it's necessary to fix the `RepartitionExec` issue) e.g. Let's say `datafusion.execution.target_partitions=4`, so there are 4 parallel parquet scanner, also 4 parallel aggregate operator in the upstream. partition 1: l_shipdate has range [1990, 2000] partition 2: l_shipdate has range [2000, 2005] partition 3: l_shipdate has range [2005, 2010] partition 4: l_shipdate has range [2010, 2020] The consequence is partition 2,3,4 has no output data in the parquet reader, then only one partition is busy, and the available CPUs can't be fully utilized. <details> <summary> TPCH Q1 plan </summary> ``` > CREATE EXTERNAL TABLE IF NOT EXISTS lineitem STORED AS parquet LOCATION '/Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem'; > explain select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as count_order from lineitem where l_shipdate <= date '1998-09-02' group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus; +---------------+-------------------------------+ | plan_type | plan | +---------------+-------------------------------+ | physical_plan | ┌───────────────────────────┐ | | | │ SortPreservingMergeExec │ | | | │ -------------------- │ | | | │ l_returnflag ASC NULLS │ | | | │ LAST, l_linestatus │ | | | │ ASC NULLS LAST │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ SortExec │ | | | │ -------------------- │ | | | │ l_returnflag@0 ASC NULLS │ | | | │ LAST, l_linestatus@1 │ | | | │ ASC NULLS LAST │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ ProjectionExec │ | | | │ -------------------- │ | | | │ avg_disc: │ | | | │ avg(lineitem.l_discount) │ | | | │ │ | | | │ avg_price: │ | | | │ avg(lineitem │ | | | │ .l_extendedp │ | | | │ rice) │ | | | │ │ | | | │ avg_qty: │ | | | │ avg(lineitem.l_quantity) │ | | | │ │ | | | │ count_order: │ | | | │ count(Int64(1)) │ | | | │ │ | | | │ l_linestatus: │ | | | │ l_linestatus │ | | | │ │ | | | │ l_returnflag: │ | | | │ l_returnflag │ | | | │ │ | | | │ sum_base_price: │ | | | │ sum(lineitem │ | | | │ .l_extendedp │ | | | │ rice) │ | | | │ │ | | | │ sum_charge: │ | | | │ sum(lineitem │ | | | │ .l_extendedp │ | | | │ rice * Int64(1) - lineitem│ | | | │ ... │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ AggregateExec │ | | | │ -------------------- │ | | | │ aggr: │ | | | │ sum(lineitem.l_quantity), │ | | | │ sum(lineitem │ | | | │ .l_extendedpric │ | | | │ e), sum(lineitem │ | | | │ .l_extendedprice │ | | | │ * Int64(1) - lineitem │ | | | │ .l_discount), sum │ | | | │ (lineitem │ | | | │ .l_extendedpri │ | | | │ ce * Int64(1) - lineitem │ | | | │ .l_discount * Int64(1) │ | | | │ + lineitem.l_tax), avg │ | | | │ (lineitem.l_quantity), │ | | | │ avg(lineitem │ | | | │ .l_extendedpri │ | | | │ ce), avg(lineitem │ | | | │ .l_discount), │ | | | │ count(1) │ | | | │ │ | | | │ group_by: │ | | | │ l_returnflag, l_linestatus│ | | | │ │ | | | │ mode: │ | | | │ FinalPartitioned │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ CoalesceBatchesExec │ | | | │ -------------------- │ | | | │ target_batch_size: │ | | | │ 8192 │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ RepartitionExec │ | | | │ -------------------- │ | | | │ partition_count(in->out): │ | | | │ 14 -> 14 │ | | | │ │ | | | │ partitioning_scheme: │ | | | │ Hash([l_returnflag@0, │ | | | │ l_linestatus@1], 14) │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ AggregateExec │ | | | │ -------------------- │ | | | │ aggr: │ | | | │ sum(lineitem.l_quantity), │ | | | │ sum(lineitem │ | | | │ .l_extendedpric │ | | | │ e), sum(lineitem │ | | | │ .l_extendedprice │ | | | │ * Int64(1) - lineitem │ | | | │ .l_discount), sum │ | | | │ (lineitem │ | | | │ .l_extendedpri │ | | | │ ce * Int64(1) - lineitem │ | | | │ .l_discount * Int64(1) │ | | | │ + lineitem.l_tax), avg │ | | | │ (lineitem.l_quantity), │ | | | │ avg(lineitem │ | | | │ .l_extendedpri │ | | | │ ce), avg(lineitem │ | | | │ .l_discount), │ | | | │ count(1) │ | | | │ │ | | | │ group_by: │ | | | │ l_returnflag, l_linestatus│ | | | │ │ | | | │ mode: Partial │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ ProjectionExec │ | | | │ -------------------- │ | | | │ __common_expr_1: │ | | | │ l_extendedprice * (Some(1)│ | | | │ ,20,0 - l_discount) │ | | | │ │ | | | │ l_discount: │ | | | │ l_discount │ | | | │ │ | | | │ l_extendedprice: │ | | | │ l_extendedprice │ | | | │ │ | | | │ l_linestatus: │ | | | │ l_linestatus │ | | | │ │ | | | │ l_quantity: │ | | | │ l_quantity │ | | | │ │ | | | │ l_returnflag: │ | | | │ l_returnflag │ | | | │ │ | | | │ l_tax: l_tax │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ CoalesceBatchesExec │ | | | │ -------------------- │ | | | │ target_batch_size: │ | | | │ 8192 │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ FilterExec │ | | | │ -------------------- │ | | | │ predicate: │ | | | │ l_shipdate <= 1998-09-02 │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ DataSourceExec │ | | | │ -------------------- │ | | | │ files: 21 │ | | | │ format: parquet │ | | | │ │ | | | │ predicate: │ | | | │ l_shipdate <= 1998-09-02 │ | | | └───────────────────────────┘ | | | | +---------------+-------------------------------+ 1 row(s) fetched. Elapsed 0.040 seconds. ``` </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
