nuno-faria commented on code in PR #17201:
URL: https://github.com/apache/datafusion/pull/17201#discussion_r2284559288
##########
datafusion/sqllogictest/test_files/push_down_filter.slt:
##########
@@ -286,5 +286,37 @@ explain select a from t where CAST(a AS string) = '0123';
physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]},
projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8View) = 0123
+# Test dynamic filter pushdown with swapped join inputs (issue #17196)
+# Create tables with different sizes to force join input swapping
+statement ok
+copy (select i as k from generate_series(1, 100) t(i)) to
'test_files/scratch/push_down_filter/small_table.parquet';
+
+statement ok
+copy (select i as k, i as v from generate_series(1, 1000) t(i)) to
'test_files/scratch/push_down_filter/large_table.parquet';
+
+statement ok
+create external table small_table stored as parquet location
'test_files/scratch/push_down_filter/small_table.parquet';
+
+statement ok
+create external table large_table stored as parquet location
'test_files/scratch/push_down_filter/large_table.parquet';
+
+# Test that dynamic filter is applied to the correct table after join input
swapping
+# The small_table should be the build side, large_table should be the probe
side with dynamic filter
+query TT
+explain select * from small_table join large_table on small_table.k =
large_table.k where large_table.v >= 50;
+----
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=8192
+02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)]
+03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/small_table.parquet]]},
projection=[k], file_type=parquet
+04)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+05)------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/large_table.parquet]]},
projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND
DynamicFilterPhysicalExpr [ true ], pruning_predicate=v_null_count@1 !=
row_count@2 AND v_max@0 >= 50, required_guarantees=[]
Review Comment:
I see, so the filter is always applied from the build side to the probe
side, independently of the query. Maybe I misunderstood but I though that was
the original issue (https://github.com/apache/datafusion/issues/17196), that
the filter was always applied to the same side, even when it made sense to do
the opposite.
For example:
```sql
copy (select i as k from generate_series(1, 1000000) t(i)) to 't1.parquet';
copy (select i as k, i as v from generate_series(1, 10000000) t(i)) to
't2.parquet';
create external table t1 stored as parquet location 't1.parquet';
create external table t2 stored as parquet location 't2.parquet';
explain analyze select *
from t1
join t2 on t1.k = t2.k
where t2.v >= 1000000;
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192,
metrics=[output_rows=1, elapsed_compute=71.205µs]
|
| | HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(k@0, k@0)], filter=[k@0 >= 14 AND k@0 <= 999997], metrics=[output_rows=1,
elapsed_compute=740.259412ms, build_input_batches=120,
build_input_rows=1000000, input_batches=12, input_rows=11713,
output_batches=12, build_mem_used=34742816, build_time=703.6063ms,
join_time=7.0188ms]
|
| | CoalesceBatchesExec: target_batch_size=8192,
metrics=[output_rows=1000000, elapsed_compute=15.225303ms]
|
| | RepartitionExec: partitioning=Hash([k@0], 12),
input_partitions=12, metrics=[fetch_time=1.6347235s,
repartition_time=105.8625ms, send_time=8.7999ms]
|
| | RepartitionExec:
partitioning=RoundRobinBatch(12), input_partitions=1,
metrics=[fetch_time=135.86ms, repartition_time=1ns, send_time=1.4348ms]
|
| | DataSourceExec: file_groups={1 group:
[[t1.parquet]]}, projection=[k], file_type=parquet,
metrics=[output_rows=1000000, elapsed_compute=1ns, batches_splitted=0,
bytes_scanned=1310405, file_open_errors=0, file_scan_errors=0,
files_ranges_pruned_statistics=0, num_predicate_creation_errors=0,
page_index_rows_matched=0, page_index_rows_pruned=0,
predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0,
row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0,
row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0,
bloom_filter_eval_time=2ns, metadata_load_time=120.801µs,
page_index_eval_time=2ns, row_pushdown_eval_time=2ns, statistics_eval_time=2ns,
time_elapsed_opening=232.8µs, time_elapsed_processing=134.8576ms,
time_elapsed_scanning_total=145.4359ms,
time_elapsed_scanning_until_data=17.3163ms]
|
| | CoalesceBatchesExec: target_batch_size=8192,
metrics=[output_rows=11713, elapsed_compute=244.4µs]
|
| | RepartitionExec: partitioning=Hash([k@0], 12),
input_partitions=12, metrics=[fetch_time=55.1884ms,
repartition_time=1.360111ms, send_time=123.432µs]
|
| | CoalesceBatchesExec: target_batch_size=8192,
metrics=[output_rows=11713, elapsed_compute=107.3µs]
|
| | FilterExec: v@1 >= 1000000,
metrics=[output_rows=11713, elapsed_compute=681.411µs]
|
| | DataSourceExec: file_groups={12 groups:
[[t2.parquet:0..2133322], [t2.parquet:2133322..4266644],
[t2.parquet:4266644..6399966], [t2.parquet:6399966..8533288],
[t2.parquet:8533288..10666610], ...]}, projection=[k, v], file_type=parquet,
predicate=v@1 >= 1000000 AND DynamicFilterPhysicalExpr [ k@0 >= 14 AND k@0 <=
999997 ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >=
1000000 AND k_null_count@4 != row_count@2 AND k_max@3 >= 14 AND k_null_count@4
!= row_count@2 AND k_min@5 <= 999997, required_guarantees=[]
|
| | , metrics=[output_rows=20480, elapsed_compute=12ns,
batches_splitted=0, bytes_scanned=379500, file_open_errors=0,
file_scan_errors=0, files_ranges_pruned_statistics=0,
num_predicate_creation_errors=0, page_index_rows_matched=20480,
page_index_rows_pruned=1028096, predicate_evaluation_errors=0,
pushdown_rows_matched=0, pushdown_rows_pruned=0,
row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1,
row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=9,
bloom_filter_eval_time=180.612µs, metadata_load_time=7.630712ms,
page_index_eval_time=317.512µs, row_pushdown_eval_time=24ns,
statistics_eval_time=1.603512ms, time_elapsed_opening=16.959ms,
time_elapsed_processing=53.3003ms, time_elapsed_scanning_total=39.3134ms,
time_elapsed_scanning_until_data=36.7037ms]
|
| |
|
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
In this example `t2` is filtered by `v` and by a dynamic filter on `k`,
while in theory it would be faster to apply a dynamic filter from `t2` to `t1`
(maybe we would need some heuristic to determine which would be the best
approach?).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]