adriangb commented on code in PR #17201:
URL: https://github.com/apache/datafusion/pull/17201#discussion_r2280521294
##########
datafusion/sqllogictest/test_files/push_down_filter.slt:
##########
@@ -286,5 +286,37 @@ explain select a from t where CAST(a AS string) = '0123';
physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]},
projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8View) = 0123
+# Test dynamic filter pushdown with swapped join inputs (issue #17196)
+# Create tables with different sizes to force join input swapping
+statement ok
+copy (select i as k from generate_series(1, 100) t(i)) to
'test_files/scratch/push_down_filter/small_table.parquet';
+
+statement ok
+copy (select i as k, i as v from generate_series(1, 1000) t(i)) to
'test_files/scratch/push_down_filter/large_table.parquet';
+
+statement ok
+create external table small_table stored as parquet location
'test_files/scratch/push_down_filter/small_table.parquet';
+
+statement ok
+create external table large_table stored as parquet location
'test_files/scratch/push_down_filter/large_table.parquet';
+
+# Test that dynamic filter is applied to the correct table after join input
swapping
+# The small_table should be the build side, large_table should be the probe
side with dynamic filter
+query TT
+explain select * from small_table join large_table on small_table.k =
large_table.k where large_table.v >= 50;
+----
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=8192
+02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)]
+03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/small_table.parquet]]},
projection=[k], file_type=parquet
+04)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+05)------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/large_table.parquet]]},
projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND
DynamicFilterPhysicalExpr [ true ], pruning_predicate=v_null_count@1 !=
row_count@2 AND v_max@0 >= 50, required_guarantees=[]
Review Comment:
But I'm not so sure about your original point: my understanding is that in
DataFusion _at runtime_ (i.e. when `ExecutionPlan::execute` is called and work
actually begins) the left side is always the build side and the right side is
always the probe side. During optimizer passes which one is left and right may
be swapped / re-arranged but all of the dynamic filter stuff happens after this
so we can always push the filters into the right side. At least for inner
joins, other join types may be more complex and I haven't even begun to wrap my
head around those cases.
##########
datafusion/sqllogictest/test_files/push_down_filter.slt:
##########
@@ -286,5 +286,37 @@ explain select a from t where CAST(a AS string) = '0123';
physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]},
projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8View) = 0123
+# Test dynamic filter pushdown with swapped join inputs (issue #17196)
+# Create tables with different sizes to force join input swapping
+statement ok
+copy (select i as k from generate_series(1, 100) t(i)) to
'test_files/scratch/push_down_filter/small_table.parquet';
+
+statement ok
+copy (select i as k, i as v from generate_series(1, 1000) t(i)) to
'test_files/scratch/push_down_filter/large_table.parquet';
+
+statement ok
+create external table small_table stored as parquet location
'test_files/scratch/push_down_filter/small_table.parquet';
+
+statement ok
+create external table large_table stored as parquet location
'test_files/scratch/push_down_filter/large_table.parquet';
+
+# Test that dynamic filter is applied to the correct table after join input
swapping
+# The small_table should be the build side, large_table should be the probe
side with dynamic filter
+query TT
+explain select * from small_table join large_table on small_table.k =
large_table.k where large_table.v >= 50;
+----
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=8192
+02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)]
+03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/small_table.parquet]]},
projection=[k], file_type=parquet
+04)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+05)------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/large_table.parquet]]},
projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND
DynamicFilterPhysicalExpr [ true ], pruning_predicate=v_null_count@1 !=
row_count@2 AND v_max@0 >= 50, required_guarantees=[]
Review Comment:
But I'm not so sure about your original point: my understanding is that _at
runtime_ (i.e. when `ExecutionPlan::execute` is called and work actually
begins) the left side is always the build side and the right side is always the
probe side. During optimizer passes which one is left and right may be swapped
/ re-arranged but all of the dynamic filter stuff happens after this so we can
always push the filters into the right side. At least for inner joins, other
join types may be more complex and I haven't even begun to wrap my head around
those cases.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]