nuno-faria commented on PR #103: URL: https://github.com/apache/datafusion-site/pull/103#issuecomment-3263709781
> > I think it might be worth looking at always combining the partial
filters with `OR`, unless maybe if the dynamic filter becomes too large.
>
> @nuno-faria could you clarify what you mean? We combine the filters from
different partitions with `OR` currently.
I see, I think the problem then might be having a single min/max bound for
each column in each partition, which sometimes can result in the final dynamic
filter including too much data.
Here is an example with TPC-H data (sf=20). This query will retrieve the
customers with id 1 and id 2999998 (two extremes of the customer table).
```sql
EXPLAIN ANALYZE
SELECT *
FROM customer
JOIN orders on c_custkey = o_custkey
WHERE c_phone = '25-989-741-2988' or c_phone = '29-590-168-8634';
set datafusion.execution.parquet.pushdown_filters = true;
```
If we execute with one partition, the entire "orders" table is retrieved,
given the dynamic filter is `o_custkey@1 >= 1 AND o_custkey@1 <= 2999998`,
which is expensive:
```sql
-- SET datafusion.execution.target_partitions = 1;
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192,
metrics=[output_rows=26, elapsed_compute=4.105301ms]
|
| | HashJoinExec: mode=CollectLeft, join_type=Inner,
on=[(c_custkey@0, o_custkey@1)], metrics=[output_rows=26,
elapsed_compute=264.951501ms, build_input_batches=2, build_input_rows=2,
input_batches=3663, input_rows=29999976, output_batches=3663,
build_mem_used=25460869, build_time=97.4156ms, join_time=75.9028ms]
|
| | DataSourceExec: file_groups={1 group:
[[/customer.parquet:0..247530905]]}, projection=[c_custkey, c_name, c_address,
c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment], file_type=parquet,
predicate=c_phone@4 = 25-989-741-2988 OR c_phone@4 = 29-590-168-8634,
pruning_predicate=c_phone_null_count@2 != row_count@3 AND c_phone_min@0 <=
25-989-741-2988 AND 25-989-741-2988 <= c_phone_max@1 OR c_phone_null_count@2 !=
row_count@3 AND c_phone_min@0 <= 29-590-168-8634 AND 29-590-168-8634 <=
c_phone_max@1, required_guarantees=[c_phone in (25-989-741-2988,
29-590-168-8634)], metrics=[output_rows=2, elapsed_compute=1ns,
batches_split=0, bytes_scanned=47855367, file_open_errors=0,
file_scan_errors=0, files_ranges_pruned_statistics=0,
num_predicate_creation_errors=0, page_index_rows_matched=0,
page_index_rows_pruned=0, predicate_evaluation_errors=0,
pushdown_rows_matched=2, pushdown_rows_pruned=2999998,
row_groups_matched_bloom_filter=0, row_groups_matched_statist
ics=25, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0,
bloom_filter_eval_time=231.501µs, metadata_load_time=82.801µs,
page_index_eval_time=101ns, row_pushdown_eval_time=5.427801ms,
statistics_eval_time=15.901µs, time_elapsed_opening=446.1µs,
time_elapsed_processing=97.3158ms, time_elapsed_scanning_total=114.4778ms,
time_elapsed_scanning_until_data=20.2666ms]
|
| | DataSourceExec: file_groups={1 group:
[[/orders.parquet:0..1270839183]]}, projection=[o_orderkey, o_custkey,
o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk,
o_shippriority, o_comment], file_type=parquet,
predicate=DynamicFilterPhysicalExpr [ o_custkey@1 >= 1 AND o_custkey@1 <=
2999998 ], pruning_predicate=o_custkey_null_count@1 != row_count@2 AND
o_custkey_max@0 >= 1 AND o_custkey_null_count@1 != row_count@2 AND
o_custkey_min@3 <= 2999998, required_guarantees=[],
metrics=[output_rows=29999976, elapsed_compute=1ns, batches_split=0,
bytes_scanned=1269546359, file_open_errors=0, file_scan_errors=0,
files_ranges_pruned_statistics=0, num_predicate_creation_errors=0,
page_index_rows_matched=0, page_index_rows_pruned=0,
predicate_evaluation_errors=0, pushdown_rows_matched=29999976,
pushdown_rows_pruned=24, row_groups_matched_bloom_filter=0,
row_groups_matched_statistics=245, row_groups_pruned_bloom_filter=0,
row_groups_pruned_statistics=0, bl
oom_filter_eval_time=1.201701ms, metadata_load_time=418.601µs,
page_index_eval_time=101ns, row_pushdown_eval_time=40.252001ms,
statistics_eval_time=23.701µs, time_elapsed_opening=1.7314ms,
time_elapsed_processing=3.0919729s, time_elapsed_scanning_total=3.6757354s,
time_elapsed_scanning_until_data=12.4854ms]
|
| |
|
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
3.796s
predicate=DynamicFilterPhysicalExpr [ o_custkey@1 >= 1 AND o_custkey@1 <=
2999998 ]
"orders" output_rows=29999976
```
With two partitions, we see the same behavior:
```sql
-- SET datafusion.execution.target_partitions = 2;
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192,
metrics=[output_rows=26, elapsed_compute=6.383902ms]
|
| | HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(c_custkey@0, o_custkey@1)], metrics=[output_rows=26,
elapsed_compute=87.705702ms, build_input_batches=1, build_input_rows=2,
input_batches=2929, input_rows=29999976, output_batches=4357,
build_mem_used=768, build_time=113.8µs, join_time=87.5417ms]
|
| | CoalesceBatchesExec: target_batch_size=8192,
metrics=[output_rows=2, elapsed_compute=41.801µs]
|
| | RepartitionExec:
partitioning=Hash([c_custkey@0], 2), input_partitions=2,
metrics=[fetch_time=118.5064ms, repartition_time=24.6µs, send_time=8.802µs]
|
| | DataSourceExec: file_groups={2 groups:
[[/customer.parquet:0..123765453], [/customer.parquet:123765453..247530905]]},
projection=[c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal,
c_mktsegment, c_comment], file_type=parquet, predicate=c_phone@4 =
25-989-741-2988 OR c_phone@4 = 29-590-168-8634,
pruning_predicate=c_phone_null_count@2 != row_count@3 AND c_phone_min@0 <=
25-989-741-2988 AND 25-989-741-2988 <= c_phone_max@1 OR c_phone_null_count@2 !=
row_count@3 AND c_phone_min@0 <= 29-590-168-8634 AND 29-590-168-8634 <=
c_phone_max@1, required_guarantees=[c_phone in (25-989-741-2988,
29-590-168-8634)], metrics=[output_rows=2, elapsed_compute=2ns,
batches_split=0, bytes_scanned=47855367, file_open_errors=0,
file_scan_errors=0, files_ranges_pruned_statistics=0,
num_predicate_creation_errors=0, page_index_rows_matched=0,
page_index_rows_pruned=0, predicate_evaluation_errors=0,
pushdown_rows_matched=2, pushdown_rows_pruned=2999998, row_groups_mat
ched_bloom_filter=0, row_groups_matched_statistics=25,
row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0,
bloom_filter_eval_time=244.102µs, metadata_load_time=179.202µs,
page_index_eval_time=103ns, row_pushdown_eval_time=5.558002ms,
statistics_eval_time=34.402µs, time_elapsed_opening=630.4µs,
time_elapsed_processing=100.8522ms, time_elapsed_scanning_total=117.8869ms,
time_elapsed_scanning_until_data=72.8676ms]
|
| | CoalesceBatchesExec: target_batch_size=8192,
metrics=[output_rows=29999976, elapsed_compute=1.506493001s]
|
| | RepartitionExec:
partitioning=Hash([o_custkey@1], 2), input_partitions=2,
metrics=[fetch_time=4.2593116s, repartition_time=760.0087ms,
send_time=378.5069ms]
|
| | DataSourceExec: file_groups={2 groups:
[[/orders.parquet:0..635419592], [/orders.parquet:635419592..1270839183]]},
projection=[o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate,
o_orderpriority, o_clerk, o_shippriority, o_comment], file_type=parquet,
predicate=DynamicFilterPhysicalExpr [ o_custkey@1 >= 1 AND o_custkey@1 <=
2999998 ], pruning_predicate=o_custkey_null_count@1 != row_count@2 AND
o_custkey_max@0 >= 1 AND o_custkey_null_count@1 != row_count@2 AND
o_custkey_min@3 <= 2999998, required_guarantees=[],
metrics=[output_rows=29999976, elapsed_compute=2ns, batches_split=0,
bytes_scanned=1269546359, file_open_errors=0, file_scan_errors=0,
files_ranges_pruned_statistics=0, num_predicate_creation_errors=0,
page_index_rows_matched=0, page_index_rows_pruned=0,
predicate_evaluation_errors=0, pushdown_rows_matched=29999976,
pushdown_rows_pruned=24, row_groups_matched_bloom_filter=0,
row_groups_matched_statistics=245, row_groups_pruned_bloom
_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=1.293802ms,
metadata_load_time=3.246702ms, page_index_eval_time=103ns,
row_pushdown_eval_time=45.873302ms, statistics_eval_time=57.602µs,
time_elapsed_opening=4.7558ms, time_elapsed_processing=3.688255s,
time_elapsed_scanning_total=5.7125152s,
time_elapsed_scanning_until_data=26.3507ms]
|
| |
|
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2.944s
predicate=DynamicFilterPhysicalExpr [ o_custkey@1 >= 1 AND o_custkey@1 <=
2999998 ]
"orders" output_rows=29999976
```
With three partitions, the dynamic filter is now `o_custkey@1 >= 2999998 AND
o_custkey@1 <= 2999998 OR o_custkey@1 >= 1 AND o_custkey@1 <= 1`, which is
considerably more efficient as it will select only the relevant data:
```sql
-- SET datafusion.execution.target_partitions = 3;
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192,
metrics=[output_rows=26, elapsed_compute=11.802µs]
|
| | HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(c_custkey@0, o_custkey@1)], metrics=[output_rows=26,
elapsed_compute=15.441303ms, build_input_batches=2, build_input_rows=2,
input_batches=2, input_rows=26, output_batches=2, build_mem_used=688,
build_time=7.7423ms, join_time=46.7µs]
|
| | CoalesceBatchesExec: target_batch_size=8192,
metrics=[output_rows=2, elapsed_compute=39.601µs]
|
| | RepartitionExec:
partitioning=Hash([c_custkey@0], 3), input_partitions=3,
metrics=[fetch_time=123.454ms, repartition_time=24.701µs, send_time=7.507µs]
|
| | DataSourceExec: file_groups={3 groups:
[[/customer.parquet:0..82510302], [/customer.parquet:82510302..165020604],
[/customer.parquet:165020604..247530905]]}, projection=[c_custkey, c_name,
c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment],
file_type=parquet, predicate=c_phone@4 = 25-989-741-2988 OR c_phone@4 =
29-590-168-8634, pruning_predicate=c_phone_null_count@2 != row_count@3 AND
c_phone_min@0 <= 25-989-741-2988 AND 25-989-741-2988 <= c_phone_max@1 OR
c_phone_null_count@2 != row_count@3 AND c_phone_min@0 <= 29-590-168-8634 AND
29-590-168-8634 <= c_phone_max@1, required_guarantees=[c_phone in
(25-989-741-2988, 29-590-168-8634)], metrics=[output_rows=2,
elapsed_compute=3ns, batches_split=0, bytes_scanned=47855367,
file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0,
num_predicate_creation_errors=0, page_index_rows_matched=0,
page_index_rows_pruned=0, predicate_evaluation_errors=0,
pushdown_rows_matched=2, push
down_rows_pruned=2999998, row_groups_matched_bloom_filter=0,
row_groups_matched_statistics=25, row_groups_pruned_bloom_filter=0,
row_groups_pruned_statistics=0, bloom_filter_eval_time=334.803µs,
metadata_load_time=276.703µs, page_index_eval_time=303ns,
row_pushdown_eval_time=5.721003ms, statistics_eval_time=52.803µs,
time_elapsed_opening=1.0154ms, time_elapsed_processing=105.2155ms,
time_elapsed_scanning_total=122.4323ms,
time_elapsed_scanning_until_data=91.6292ms]
|
| | CoalesceBatchesExec: target_batch_size=8192,
metrics=[output_rows=26, elapsed_compute=270.103µs]
|
| | RepartitionExec:
partitioning=Hash([o_custkey@1], 3), input_partitions=3,
metrics=[fetch_time=387.6644ms, repartition_time=383.4µs, send_time=132.503µs]
|
| | DataSourceExec: file_groups={3 groups:
[[/orders.parquet:0..423613061], [/orders.parquet:423613061..847226122],
[/orders.parquet:847226122..1270839183]]}, projection=[o_orderkey, o_custkey,
o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk,
o_shippriority, o_comment], file_type=parquet,
predicate=DynamicFilterPhysicalExpr [ o_custkey@1 >= 2999998 AND o_custkey@1 <=
2999998 OR o_custkey@1 >= 1 AND o_custkey@1 <= 1 ],
pruning_predicate=o_custkey_null_count@1 != row_count@2 AND o_custkey_max@0 >=
2999998 AND o_custkey_null_count@1 != row_count@2 AND o_custkey_min@3 <=
2999998 OR o_custkey_null_count@1 != row_count@2 AND o_custkey_max@0 >= 1 AND
o_custkey_null_count@1 != row_count@2 AND o_custkey_min@3 <= 1,
required_guarantees=[], metrics=[output_rows=26, elapsed_compute=3ns,
batches_split=0, bytes_scanned=143516849, file_open_errors=0,
file_scan_errors=0, files_ranges_pruned_statistics=0,
num_predicate_creation_errors=0, page_index_rows
_matched=0, page_index_rows_pruned=0, predicate_evaluation_errors=0,
pushdown_rows_matched=26, pushdown_rows_pruned=5775334,
row_groups_matched_bloom_filter=0, row_groups_matched_statistics=47,
row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=198,
bloom_filter_eval_time=561.103µs, metadata_load_time=6.536403ms,
page_index_eval_time=204ns, row_pushdown_eval_time=13.038503ms,
statistics_eval_time=146.203µs, time_elapsed_opening=7.6246ms,
time_elapsed_processing=318.3838ms, time_elapsed_scanning_total=380.7159ms,
time_elapsed_scanning_until_data=44.9063ms]
|
| |
|
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
0.192s
predicate=DynamicFilterPhysicalExpr [ o_custkey@1 >= 2999998 AND o_custkey@1
<= 2999998 OR o_custkey@1 >= 1 AND o_custkey@1 <= 1 ]
"orders" output_rows=26
```
The different partitions must not have scanned data which included both
extremes, resulting in an efficient dynamic filter.
Would it be feasible to have
[`ColumnBounds`](https://github.com/apache/datafusion/blob/baf6f602879030dea741322d6f219d401983bb78/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs#L39)
include multiple ranges (which would then be combined with `OR`) instead of a
single min/max? I think this could solve the problem in these type of queries.
The potential issue might be having queries whose build side would return many
rows, causing the dynamic filter to be very large, but in that case we could
merge the ranges to not exceed some N.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
