This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new bc600b3090 Split `push_down_filter.slt` into standalone sqllogictest
files to reduce long-tail runtime (#20566)
bc600b3090 is described below
commit bc600b3090d783a20eef0760098ddfa9d6ec8595
Author: kosiew <[email protected]>
AuthorDate: Fri Feb 27 16:36:31 2026 +0800
Split `push_down_filter.slt` into standalone sqllogictest files to reduce
long-tail runtime (#20566)
## Which issue does this PR close?
* Part of #20524
## Rationale for this change
`datafusion/sqllogictest/test_files/push_down_filter.slt` had grown into
a large sqllogictest file. Since the sqllogictest runner parallelizes at
**file granularity**, a single heavyweight file can become a straggler
and dominate wall-clock time.
This PR performs a non-invasive split of that file into smaller,
self-contained `.slt` files so the runner can distribute work more
evenly across threads, improving overall suite balance without changing
SQL semantics or test coverage.
## What changes are included in this PR?
* Removed the monolithic `push_down_filter.slt`.
* Added new standalone sqllogictest files, each with the minimal
setup/teardown required to run independently:
* `push_down_filter_unnest.slt` — unnest filter pushdown coverage
(including struct/field cases).
* `push_down_filter_parquet.slt` — parquet filter pushdown + limit +
cast predicate behavior + dynamic filter pushdown (swapped join inputs).
* `push_down_filter_outer_joins.slt` — LEFT/RIGHT join and anti-join
logical filter pushdown checks.
* `push_down_filter_regression.slt` — regression coverage for issues
#17188 and #17512, plus aggregate dynamic filter pushdown checks.
* Updated scratch output paths to be file-scoped (e.g.
`test_files/scratch/push_down_filter_parquet/...`) to reduce the chance
of conflicts when tests execute in parallel.
* Preserved all original query expectations and explain-plan assertions;
changes are organizational only.
## Are these changes tested?
Yes, with a python script to compare text blocks in the new slt files vs
old single slt file.
```
python - <<'PY'
import subprocess, re
from collections import defaultdict, deque
repo = '.'
old_spec =
'692a7cb67^:datafusion/sqllogictest/test_files/push_down_filter.slt'
new_specs = [
'HEAD:datafusion/sqllogictest/test_files/push_down_filter_outer_joins.slt',
'HEAD:datafusion/sqllogictest/test_files/push_down_filter_parquet.slt',
'HEAD:datafusion/sqllogictest/test_files/push_down_filter_regression.slt',
'HEAD:datafusion/sqllogictest/test_files/push_down_filter_unnest.slt',
]
def git_show(spec):
return subprocess.check_output(['git', '-C', repo, 'show', spec],
text=True)
def normalize_sql(sql):
s = sql.strip().lower()
s = re.sub(
r"test_files/scratch/push_down_filter(?:_[^'\s;)/]+)?[^'\s;)]*",
"test_files/scratch/__norm__",
s,
)
s = re.sub(r'\s+', ' ', s)
return s
def blocks(text):
lines = text.splitlines()
out = []
i = 0
while i < len(lines):
m = lines[i].strip()
if m.startswith('query '):
i += 1
b = []
while i < len(lines) and lines[i].strip() != '----':
if not lines[i].lstrip().startswith('#'):
b.append(lines[i])
i += 1
sql = '\n'.join(b).strip()
if sql:
out.append(('query', normalize_sql(sql)))
elif m.startswith('statement '):
i += 1
b = []
while i < len(lines):
s = lines[i].strip()
if s == '':
break
if s.startswith('query ') or s.startswith('statement '):
i -= 1
break
if not lines[i].lstrip().startswith('#'):
b.append(lines[i])
i += 1
sql = '\n'.join(b).strip()
if sql:
out.append(('statement', normalize_sql(sql)))
i += 1
return out
old_blocks = blocks(git_show(old_spec))
new_blocks = []
for s in new_specs:
new_blocks.extend(blocks(git_show(s)))
q = defaultdict(deque)
for item in new_blocks:
q[item].append(item)
missing = 0
extra = 0
for item in old_blocks:
if q[item]:
q[item].popleft()
else:
missing += 1
for v in q.values():
extra += len(v)
print(f'old_blocks={len(old_blocks)}')
print(f'new_blocks={len(new_blocks)}')
print(f'missing={missing}')
print(f'extra={extra}')
print(f'baseline={old_spec}')
if missing != 0:
raise SystemExit(1)
PY
```
Output:
```
old_blocks=107
new_blocks=108
missing=0
extra=1
```
The extra(1) is this statement block:
set datafusion.explain.physical_plan_only = true;
Why it shows as extra:
In split files, it appears 3 times:
push_down_filter_parquet.slt:21
push_down_filter_unnest.slt:21
push_down_filter_regression.slt:129
In the baseline monolithic file at e937cadbc^, it appears 2 times.
So comparison reports 3 - 2 = extra 1.
## Are there any user-facing changes?
No user-facing behavior changes. This is a test-suite
organization/performance improvement only.
## Note before merging
Revert e8369bb (it is a commit to trigger the CI extented tests for
sqllogictest)
## LLM-generated code disclosure
This PR includes LLM-generated code and comments. All LLM-generated
content has been manually reviewed and tested.
---
.../sqllogictest/test_files/push_down_filter.slt | 745 ---------------------
.../test_files/push_down_filter_outer_joins.slt | 264 ++++++++
.../test_files/push_down_filter_parquet.slt | 188 ++++++
.../test_files/push_down_filter_regression.slt | 200 ++++++
.../test_files/push_down_filter_unnest.slt | 148 ++++
5 files changed, 800 insertions(+), 745 deletions(-)
diff --git a/datafusion/sqllogictest/test_files/push_down_filter.slt
b/datafusion/sqllogictest/test_files/push_down_filter.slt
deleted file mode 100644
index edafcfaa54..0000000000
--- a/datafusion/sqllogictest/test_files/push_down_filter.slt
+++ /dev/null
@@ -1,745 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Test push down filter
-
-statement ok
-set datafusion.explain.physical_plan_only = true;
-
-statement ok
-CREATE TABLE IF NOT EXISTS v AS VALUES(1,[1,2,3]),(2,[3,4,5]);
-
-query I
-select uc2 from (select unnest(column2) as uc2, column1 from v) where column1
= 2;
-----
-3
-4
-5
-
-# test push down filter for unnest with filter on non-unnest column
-# filter plan is pushed down into projection plan
-query TT
-explain select uc2 from (select unnest(column2) as uc2, column1 from v) where
column1 = 2;
-----
-physical_plan
-01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2]
-02)--UnnestExec
-03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-04)------ProjectionExec: expr=[column2@0 as __unnest_placeholder(v.column2)]
-05)--------FilterExec: column1@0 = 2, projection=[column2@1]
-06)----------DataSourceExec: partitions=1, partition_sizes=[1]
-
-query I
-select uc2 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3;
-----
-4
-5
-
-# test push down filter for unnest with filter on unnest column
-query TT
-explain select uc2 from (select unnest(column2) as uc2, column1 from v) where
uc2 > 3;
-----
-physical_plan
-01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2]
-02)--FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3
-03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-04)------UnnestExec
-05)--------ProjectionExec: expr=[column2@0 as __unnest_placeholder(v.column2)]
-06)----------DataSourceExec: partitions=1, partition_sizes=[1]
-
-query II
-select uc2, column1 from (select unnest(column2) as uc2, column1 from v)
where uc2 > 3 AND column1 = 2;
-----
-4 2
-5 2
-
-# Could push the filter (column1 = 2) down below unnest
-query TT
-explain select uc2, column1 from (select unnest(column2) as uc2, column1 from
v) where uc2 > 3 AND column1 = 2;
-----
-physical_plan
-01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2,
column1@1 as column1]
-02)--FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3
-03)----UnnestExec
-04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-05)--------ProjectionExec: expr=[column2@1 as __unnest_placeholder(v.column2),
column1@0 as column1]
-06)----------FilterExec: column1@0 = 2
-07)------------DataSourceExec: partitions=1, partition_sizes=[1]
-
-query II
-select uc2, column1 from (select unnest(column2) as uc2, column1 from v)
where uc2 > 3 OR column1 = 2;
-----
-3 2
-4 2
-5 2
-
-# only non-unnest filter in AND clause could be pushed down
-query TT
-explain select uc2, column1 from (select unnest(column2) as uc2, column1 from
v) where uc2 > 3 OR column1 = 2;
-----
-physical_plan
-01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2,
column1@1 as column1]
-02)--FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3 OR column1@1 = 2
-03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-04)------UnnestExec
-05)--------ProjectionExec: expr=[column2@1 as __unnest_placeholder(v.column2),
column1@0 as column1]
-06)----------DataSourceExec: partitions=1, partition_sizes=[1]
-
-statement ok
-drop table v;
-
-# test with unnest struct, should not push down filter
-statement ok
-CREATE TABLE d AS VALUES(1,[named_struct('a', 1, 'b',
2)]),(2,[named_struct('a', 3, 'b', 4), named_struct('a', 5, 'b', 6)]);
-
-query I?
-select * from (select column1, unnest(column2) as o from d) where o['a'] = 1;
-----
-1 {a: 1, b: 2}
-
-query TT
-explain select * from (select column1, unnest(column2) as o from d) where
o['a'] = 1;
-----
-physical_plan
-01)ProjectionExec: expr=[column1@0 as column1,
__unnest_placeholder(d.column2,depth=1)@1 as o]
-02)--FilterExec: __datafusion_extracted_1@0 = 1, projection=[column1@1,
__unnest_placeholder(d.column2,depth=1)@2]
-03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-04)------ProjectionExec:
expr=[get_field(__unnest_placeholder(d.column2,depth=1)@1, a) as
__datafusion_extracted_1, column1@0 as column1,
__unnest_placeholder(d.column2,depth=1)@1 as
__unnest_placeholder(d.column2,depth=1)]
-05)--------UnnestExec
-06)----------ProjectionExec: expr=[column1@0 as column1, column2@1 as
__unnest_placeholder(d.column2)]
-07)------------DataSourceExec: partitions=1, partition_sizes=[1]
-
-statement ok
-drop table d;
-
-statement ok
-CREATE TABLE d AS VALUES (named_struct('a', 1, 'b', 2)), (named_struct('a', 3,
'b', 4)), (named_struct('a', 5, 'b', 6));
-
-query II
-select * from (select unnest(column1) from d) where
"__unnest_placeholder(d.column1).b" > 5;
-----
-5 6
-
-query TT
-explain select * from (select unnest(column1) from d) where
"__unnest_placeholder(d.column1).b" > 5;
-----
-physical_plan
-01)FilterExec: __unnest_placeholder(d.column1).b@1 > 5
-02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-03)----UnnestExec
-04)------ProjectionExec: expr=[column1@0 as __unnest_placeholder(d.column1)]
-05)--------DataSourceExec: partitions=1, partition_sizes=[1]
-
-statement ok
-drop table d;
-
-# Test push down filter with limit for parquet
-statement ok
-set datafusion.execution.parquet.pushdown_filters = true;
-
-# this one is also required to make DF skip second file due to "sufficient"
amount of rows
-statement ok
-set datafusion.execution.collect_statistics = true;
-
-# Create a table as a data source
-statement ok
-CREATE TABLE src_table (
- part_key INT,
- value INT
-) AS VALUES(1, 0), (1, 1), (1, 100), (2, 0), (2, 2), (2, 2), (2, 100), (3, 4),
(3, 5), (3, 6);
-
-
-# There will be more than 2 records filtered from the table to check that
`limit 1` actually applied.
-# Setup 3 files, i.e., as many as there are partitions:
-
-# File 1:
-query I
-COPY (SELECT * FROM src_table where part_key = 1)
-TO 'test_files/scratch/push_down_filter/test_filter_with_limit/part-0.parquet'
-STORED AS PARQUET;
-----
-3
-
-# File 2:
-query I
-COPY (SELECT * FROM src_table where part_key = 2)
-TO 'test_files/scratch/push_down_filter/test_filter_with_limit/part-1.parquet'
-STORED AS PARQUET;
-----
-4
-
-# File 3:
-query I
-COPY (SELECT * FROM src_table where part_key = 3)
-TO 'test_files/scratch/push_down_filter/test_filter_with_limit/part-2.parquet'
-STORED AS PARQUET;
-----
-3
-
-statement ok
-CREATE EXTERNAL TABLE test_filter_with_limit
-(
- part_key INT,
- value INT
-)
-STORED AS PARQUET
-LOCATION 'test_files/scratch/push_down_filter/test_filter_with_limit/';
-
-query TT
-explain select * from test_filter_with_limit where value = 2 limit 1;
-----
-physical_plan
-01)CoalescePartitionsExec: fetch=1
-02)--DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/test_filter_with_limit/part-0.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/test_filter_with_limit/part-1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/test_filter_with_limit/part-2.parquet]]},
projection=[part_key, value], limit=1, file_type=parquet, predicate=value@1 =
2, pruning_predicat [...]
-
-query II
-select * from test_filter_with_limit where value = 2 limit 1;
-----
-2 2
-
-
-# Tear down test_filter_with_limit table:
-statement ok
-DROP TABLE test_filter_with_limit;
-
-# Tear down src_table table:
-statement ok
-DROP TABLE src_table;
-
-
-query I
-COPY (VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (10))
-TO 'test_files/scratch/push_down_filter/t.parquet'
-STORED AS PARQUET;
-----
-10
-
-statement ok
-CREATE EXTERNAL TABLE t
-(
- a INT
-)
-STORED AS PARQUET
-LOCATION 'test_files/scratch/push_down_filter/t.parquet';
-
-
-# The predicate should not have a column cast when the value is a valid i32
-query TT
-explain select a from t where a = '100';
-----
-physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]},
projection=[a], file_type=parquet, predicate=a@0 = 100,
pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <=
a_max@1, required_guarantees=[a in (100)]
-
-# The predicate should not have a column cast when the value is a valid i32
-query TT
-explain select a from t where a != '100';
-----
-physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]},
projection=[a], file_type=parquet, predicate=a@0 != 100,
pruning_predicate=a_null_count@2 != row_count@3 AND (a_min@0 != 100 OR 100 !=
a_max@1), required_guarantees=[a not in (100)]
-
-# The predicate should still have the column cast when the value is a NOT
valid i32
-query TT
-explain select a from t where a = '99999999999';
-----
-physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]},
projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99999999999
-
-# The predicate should still have the column cast when the value is a NOT
valid i32
-query TT
-explain select a from t where a = '99.99';
-----
-physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]},
projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99.99
-
-# The predicate should still have the column cast when the value is a NOT
valid i32
-query TT
-explain select a from t where a = '';
-----
-physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]},
projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) =
-
-# The predicate should not have a column cast when the operator is = or != and
the literal can be round-trip casted without losing information.
-query TT
-explain select a from t where cast(a as string) = '100';
-----
-physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]},
projection=[a], file_type=parquet, predicate=a@0 = 100,
pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <=
a_max@1, required_guarantees=[a in (100)]
-
-# The predicate should still have the column cast when the literal alters its
string representation after round-trip casting (leading zero lost).
-query TT
-explain select a from t where CAST(a AS string) = '0123';
-----
-physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]},
projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8View) = 0123
-
-
-# Test dynamic filter pushdown with swapped join inputs (issue #17196)
-# Create tables with different sizes to force join input swapping
-statement ok
-copy (select i as k from generate_series(1, 100) t(i)) to
'test_files/scratch/push_down_filter/small_table.parquet';
-
-statement ok
-copy (select i as k, i as v from generate_series(1, 1000) t(i)) to
'test_files/scratch/push_down_filter/large_table.parquet';
-
-statement ok
-create external table small_table stored as parquet location
'test_files/scratch/push_down_filter/small_table.parquet';
-
-statement ok
-create external table large_table stored as parquet location
'test_files/scratch/push_down_filter/large_table.parquet';
-
-# Test that dynamic filter is applied to the correct table after join input
swapping
-# The small_table should be the build side, large_table should be the probe
side with dynamic filter
-query TT
-explain select * from small_table join large_table on small_table.k =
large_table.k where large_table.v >= 50;
-----
-physical_plan
-01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)]
-02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/small_table.parquet]]},
projection=[k], file_type=parquet
-03)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-04)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/large_table.parquet]]},
projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilter [
empty ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50,
required_guarantees=[]
-
-statement ok
-drop table small_table;
-
-statement ok
-drop table large_table;
-
-statement ok
-drop table t;
-
-# Regression test for https://github.com/apache/datafusion/issues/17188
-query I
-COPY (select i as k from generate_series(1, 10000000) as t(i))
-TO 'test_files/scratch/push_down_filter/t1.parquet'
-STORED AS PARQUET;
-----
-10000000
-
-query I
-COPY (select i as k, i as v from generate_series(1, 10000000) as t(i))
-TO 'test_files/scratch/push_down_filter/t2.parquet'
-STORED AS PARQUET;
-----
-10000000
-
-statement ok
-create external table t1 stored as parquet location
'test_files/scratch/push_down_filter/t1.parquet';
-
-statement ok
-create external table t2 stored as parquet location
'test_files/scratch/push_down_filter/t2.parquet';
-
-# The failure before https://github.com/apache/datafusion/pull/17197 was
non-deterministic and random
-# So we'll run the same query a couple of times just to have more certainty
it's fixed
-# Sorry about the spam in this slt test...
-
-query III rowsort
-select *
-from t1
-join t2 on t1.k = t2.k
-where v = 1 or v = 10000000
-order by t1.k, t2.v;
-----
-1 1 1
-10000000 10000000 10000000
-
-query III rowsort
-select *
-from t1
-join t2 on t1.k = t2.k
-where v = 1 or v = 10000000
-order by t1.k, t2.v;
-----
-1 1 1
-10000000 10000000 10000000
-
-query III rowsort
-select *
-from t1
-join t2 on t1.k = t2.k
-where v = 1 or v = 10000000
-order by t1.k, t2.v;
-----
-1 1 1
-10000000 10000000 10000000
-
-query III rowsort
-select *
-from t1
-join t2 on t1.k = t2.k
-where v = 1 or v = 10000000
-order by t1.k, t2.v;
-----
-1 1 1
-10000000 10000000 10000000
-
-query III rowsort
-select *
-from t1
-join t2 on t1.k = t2.k
-where v = 1 or v = 10000000
-order by t1.k, t2.v;
-----
-1 1 1
-10000000 10000000 10000000
-
-# Regression test for https://github.com/apache/datafusion/issues/17512
-
-query I
-COPY (
- SELECT arrow_cast('2025-01-01T00:00:00Z'::timestamptz,
'Timestamp(Microsecond, Some("UTC"))') AS start_timestamp
-)
-TO 'test_files/scratch/push_down_filter/17512.parquet'
-STORED AS PARQUET;
-----
-1
-
-statement ok
-CREATE EXTERNAL TABLE records STORED AS PARQUET LOCATION
'test_files/scratch/push_down_filter/17512.parquet';
-
-query I
-SELECT 1
-FROM (
- SELECT start_timestamp
- FROM records
- WHERE start_timestamp <= '2025-01-01T00:00:00Z'::timestamptz
-) AS t
-WHERE t.start_timestamp::time < '00:00:01'::time;
-----
-1
-
-# Test aggregate dynamic filter pushdown
-# Note: most of the test coverage lives in
`datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs`
-# , to compare dynamic filter content easier. Here the tests are simple
end-to-end
-# exercises.
-
-statement ok
-set datafusion.explain.format = 'indent';
-
-statement ok
-set datafusion.explain.physical_plan_only = true;
-
-statement ok
-set datafusion.execution.target_partitions = 2;
-
-statement ok
-set datafusion.execution.parquet.pushdown_filters = true;
-
-statement ok
-set datafusion.optimizer.enable_dynamic_filter_pushdown = true;
-
-statement ok
-set datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown = true;
-
-statement ok
-create external table agg_dyn_test stored as parquet location
'../core/tests/data/test_statistics_per_partition';
-
-# Expect dynamic filter available inside data source
-query TT
-explain select max(id) from agg_dyn_test where id > 1;
-----
-physical_plan
-01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id)]
-02)--CoalescePartitionsExec
-03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id)]
-04)------DataSourceExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet,
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet],
[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet,
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo2
[...]
-
-query I
-select max(id) from agg_dyn_test where id > 1;
-----
-4
-
-# Expect dynamic filter available inside data source
-query TT
-explain select max(id) from agg_dyn_test where (id+1) > 1;
-----
-physical_plan
-01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id)]
-02)--CoalescePartitionsExec
-03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id)]
-04)------DataSourceExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet,
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet],
[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet,
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo2
[...]
-
-# Expect dynamic filter available inside data source
-query TT
-explain select max(id), min(id) from agg_dyn_test where id < 10;
-----
-physical_plan
-01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id),
min(agg_dyn_test.id)]
-02)--CoalescePartitionsExec
-03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id),
min(agg_dyn_test.id)]
-04)------DataSourceExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet,
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet],
[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet,
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo2
[...]
-
-# Dynamic filter should not be available for grouping sets
-query TT
-explain select max(id) from agg_dyn_test where id < 10
-group by grouping sets ((), (id))
-----
-physical_plan
-01)ProjectionExec: expr=[max(agg_dyn_test.id)@2 as max(agg_dyn_test.id)]
-02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, __grouping_id@1 as
__grouping_id], aggr=[max(agg_dyn_test.id)]
-03)----RepartitionExec: partitioning=Hash([id@0, __grouping_id@1], 2),
input_partitions=2
-04)------AggregateExec: mode=Partial, gby=[(NULL as id), (id@0 as id)],
aggr=[max(agg_dyn_test.id)]
-05)--------DataSourceExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet,
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet],
[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet,
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQ
[...]
-
-statement ok
-drop table agg_dyn_test;
-
-statement ok
-drop table t1;
-
-statement ok
-drop table t2;
-
-
-
-# check LEFT/RIGHT joins with filter pushdown to both relations (when possible)
-
-statement ok
-create table t1(k int, v int);
-
-statement ok
-create table t2(k int, v int);
-
-statement ok
-insert into t1 values
- (1, 10),
- (2, 20),
- (3, 30),
- (null, 40),
- (50, null),
- (null, null);
-
-statement ok
-insert into t2 values
- (1, 11),
- (2, 21),
- (2, 22),
- (null, 41),
- (51, null),
- (null, null);
-
-statement ok
-set datafusion.explain.physical_plan_only = false;
-
-statement ok
-set datafusion.explain.logical_plan_only = true;
-
-
-# left join + filter on join key -> pushed
-query TT
-explain select * from t1 left join t2 on t1.k = t2.k where t1.k > 1;
-----
-logical_plan
-01)Left Join: t1.k = t2.k
-02)--Filter: t1.k > Int32(1)
-03)----TableScan: t1 projection=[k, v]
-04)--Filter: t2.k > Int32(1)
-05)----TableScan: t2 projection=[k, v]
-
-query IIII rowsort
-select * from t1 left join t2 on t1.k = t2.k where t1.k > 1;
-----
-2 20 2 21
-2 20 2 22
-3 30 NULL NULL
-50 NULL NULL NULL
-
-# left join + filter on another column -> not pushed
-query TT
-explain select * from t1 left join t2 on t1.k = t2.k where t1.v > 1;
-----
-logical_plan
-01)Left Join: t1.k = t2.k
-02)--Filter: t1.v > Int32(1)
-03)----TableScan: t1 projection=[k, v]
-04)--TableScan: t2 projection=[k, v]
-
-query IIII rowsort
-select * from t1 left join t2 on t1.k = t2.k where t1.v > 1;
-----
-1 10 1 11
-2 20 2 21
-2 20 2 22
-3 30 NULL NULL
-NULL 40 NULL NULL
-
-# left join + or + filter on another column -> not pushed
-query TT
-explain select * from t1 left join t2 on t1.k = t2.k where t1.k > 3 or t1.v >
20;
-----
-logical_plan
-01)Left Join: t1.k = t2.k
-02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20)
-03)----TableScan: t1 projection=[k, v]
-04)--TableScan: t2 projection=[k, v]
-
-query IIII rowsort
-select * from t1 left join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
-----
-3 30 NULL NULL
-50 NULL NULL NULL
-NULL 40 NULL NULL
-
-
-# right join + filter on join key -> pushed
-query TT
-explain select * from t1 right join t2 on t1.k = t2.k where t1.k > 1;
-----
-logical_plan
-01)Inner Join: t1.k = t2.k
-02)--Filter: t1.k > Int32(1)
-03)----TableScan: t1 projection=[k, v]
-04)--Filter: t2.k > Int32(1)
-05)----TableScan: t2 projection=[k, v]
-
-query IIII rowsort
-select * from t1 right join t2 on t1.k = t2.k where t1.k > 1;
-----
-2 20 2 21
-2 20 2 22
-
-# right join + filter on another column -> not pushed
-query TT
-explain select * from t1 right join t2 on t1.k = t2.k where t1.v > 1;
-----
-logical_plan
-01)Inner Join: t1.k = t2.k
-02)--Filter: t1.v > Int32(1)
-03)----TableScan: t1 projection=[k, v]
-04)--TableScan: t2 projection=[k, v]
-
-query IIII rowsort
-select * from t1 right join t2 on t1.k = t2.k where t1.v > 1;
-----
-1 10 1 11
-2 20 2 21
-2 20 2 22
-
-# right join + or + filter on another column -> not pushed
-query TT
-explain select * from t1 right join t2 on t1.k = t2.k where t1.k > 3 or t1.v >
20;
-----
-logical_plan
-01)Inner Join: t1.k = t2.k
-02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20)
-03)----TableScan: t1 projection=[k, v]
-04)--TableScan: t2 projection=[k, v]
-
-query IIII rowsort
-select * from t1 right join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
-----
-
-
-# left anti join + filter on join key -> pushed
-query TT
-explain select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 1;
-----
-logical_plan
-01)LeftAnti Join: t1.k = t2.k
-02)--Filter: t1.k > Int32(1)
-03)----TableScan: t1 projection=[k, v]
-04)--Filter: t2.k > Int32(1)
-05)----TableScan: t2 projection=[k]
-
-query II rowsort
-select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 1;
-----
-3 30
-50 NULL
-
-# left anti join + filter on another column -> not pushed
-query TT
-explain select * from t1 left anti join t2 on t1.k = t2.k where t1.v > 1;
-----
-logical_plan
-01)LeftAnti Join: t1.k = t2.k
-02)--Filter: t1.v > Int32(1)
-03)----TableScan: t1 projection=[k, v]
-04)--TableScan: t2 projection=[k]
-
-query II rowsort
-select * from t1 left anti join t2 on t1.k = t2.k where t1.v > 1;
-----
-3 30
-NULL 40
-
-# left anti join + or + filter on another column -> not pushed
-query TT
-explain select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 3 or
t1.v > 20;
-----
-logical_plan
-01)LeftAnti Join: t1.k = t2.k
-02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20)
-03)----TableScan: t1 projection=[k, v]
-04)--TableScan: t2 projection=[k]
-
-query II rowsort
-select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
-----
-3 30
-50 NULL
-NULL 40
-
-
-# right anti join + filter on join key -> pushed
-query TT
-explain select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 1;
-----
-logical_plan
-01)RightAnti Join: t1.k = t2.k
-02)--Filter: t1.k > Int32(1)
-03)----TableScan: t1 projection=[k]
-04)--Filter: t2.k > Int32(1)
-05)----TableScan: t2 projection=[k, v]
-
-query II rowsort
-select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 1;
-----
-51 NULL
-
-# right anti join + filter on another column -> not pushed
-query TT
-explain select * from t1 right anti join t2 on t1.k = t2.k where t2.v > 1;
-----
-logical_plan
-01)RightAnti Join: t1.k = t2.k
-02)--TableScan: t1 projection=[k]
-03)--Filter: t2.v > Int32(1)
-04)----TableScan: t2 projection=[k, v]
-
-query II rowsort
-select * from t1 right anti join t2 on t1.k = t2.k where t2.v > 1;
-----
-NULL 41
-
-# right anti join + or + filter on another column -> not pushed
-query TT
-explain select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 3 or
t2.v > 20;
-----
-logical_plan
-01)RightAnti Join: t1.k = t2.k
-02)--TableScan: t1 projection=[k]
-03)--Filter: t2.k > Int32(3) OR t2.v > Int32(20)
-04)----TableScan: t2 projection=[k, v]
-
-query II rowsort
-select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 3 or t2.v > 20;
-----
-51 NULL
-NULL 41
-
-
-statement ok
-set datafusion.explain.logical_plan_only = false;
-
-statement ok
-drop table t1;
-
-statement ok
-drop table t2;
diff --git
a/datafusion/sqllogictest/test_files/push_down_filter_outer_joins.slt
b/datafusion/sqllogictest/test_files/push_down_filter_outer_joins.slt
new file mode 100644
index 0000000000..2e5f7c317f
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/push_down_filter_outer_joins.slt
@@ -0,0 +1,264 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Test push down filter
+
+# check LEFT/RIGHT joins with filter pushdown to both relations (when possible)
+
+statement ok
+create table t1(k int, v int);
+
+statement ok
+create table t2(k int, v int);
+
+statement ok
+insert into t1 values
+ (1, 10),
+ (2, 20),
+ (3, 30),
+ (null, 40),
+ (50, null),
+ (null, null);
+
+statement ok
+insert into t2 values
+ (1, 11),
+ (2, 21),
+ (2, 22),
+ (null, 41),
+ (51, null),
+ (null, null);
+
+statement ok
+set datafusion.explain.physical_plan_only = false;
+
+statement ok
+set datafusion.explain.logical_plan_only = true;
+
+
+# left join + filter on join key -> pushed
+query TT
+explain select * from t1 left join t2 on t1.k = t2.k where t1.k > 1;
+----
+logical_plan
+01)Left Join: t1.k = t2.k
+02)--Filter: t1.k > Int32(1)
+03)----TableScan: t1 projection=[k, v]
+04)--Filter: t2.k > Int32(1)
+05)----TableScan: t2 projection=[k, v]
+
+query IIII rowsort
+select * from t1 left join t2 on t1.k = t2.k where t1.k > 1;
+----
+2 20 2 21
+2 20 2 22
+3 30 NULL NULL
+50 NULL NULL NULL
+
+# left join + filter on another column -> not pushed
+query TT
+explain select * from t1 left join t2 on t1.k = t2.k where t1.v > 1;
+----
+logical_plan
+01)Left Join: t1.k = t2.k
+02)--Filter: t1.v > Int32(1)
+03)----TableScan: t1 projection=[k, v]
+04)--TableScan: t2 projection=[k, v]
+
+query IIII rowsort
+select * from t1 left join t2 on t1.k = t2.k where t1.v > 1;
+----
+1 10 1 11
+2 20 2 21
+2 20 2 22
+3 30 NULL NULL
+NULL 40 NULL NULL
+
+# left join + or + filter on another column -> not pushed
+query TT
+explain select * from t1 left join t2 on t1.k = t2.k where t1.k > 3 or t1.v >
20;
+----
+logical_plan
+01)Left Join: t1.k = t2.k
+02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20)
+03)----TableScan: t1 projection=[k, v]
+04)--TableScan: t2 projection=[k, v]
+
+query IIII rowsort
+select * from t1 left join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
+----
+3 30 NULL NULL
+50 NULL NULL NULL
+NULL 40 NULL NULL
+
+
+# right join + filter on join key -> pushed
+query TT
+explain select * from t1 right join t2 on t1.k = t2.k where t1.k > 1;
+----
+logical_plan
+01)Inner Join: t1.k = t2.k
+02)--Filter: t1.k > Int32(1)
+03)----TableScan: t1 projection=[k, v]
+04)--Filter: t2.k > Int32(1)
+05)----TableScan: t2 projection=[k, v]
+
+query IIII rowsort
+select * from t1 right join t2 on t1.k = t2.k where t1.k > 1;
+----
+2 20 2 21
+2 20 2 22
+
+# right join + filter on another column -> not pushed
+query TT
+explain select * from t1 right join t2 on t1.k = t2.k where t1.v > 1;
+----
+logical_plan
+01)Inner Join: t1.k = t2.k
+02)--Filter: t1.v > Int32(1)
+03)----TableScan: t1 projection=[k, v]
+04)--TableScan: t2 projection=[k, v]
+
+query IIII rowsort
+select * from t1 right join t2 on t1.k = t2.k where t1.v > 1;
+----
+1 10 1 11
+2 20 2 21
+2 20 2 22
+
+# right join + or + filter on another column -> not pushed
+query TT
+explain select * from t1 right join t2 on t1.k = t2.k where t1.k > 3 or t1.v >
20;
+----
+logical_plan
+01)Inner Join: t1.k = t2.k
+02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20)
+03)----TableScan: t1 projection=[k, v]
+04)--TableScan: t2 projection=[k, v]
+
+query IIII rowsort
+select * from t1 right join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
+----
+
+
+# left anti join + filter on join key -> pushed
+query TT
+explain select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 1;
+----
+logical_plan
+01)LeftAnti Join: t1.k = t2.k
+02)--Filter: t1.k > Int32(1)
+03)----TableScan: t1 projection=[k, v]
+04)--Filter: t2.k > Int32(1)
+05)----TableScan: t2 projection=[k]
+
+query II rowsort
+select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 1;
+----
+3 30
+50 NULL
+
+# left anti join + filter on another column -> not pushed
+query TT
+explain select * from t1 left anti join t2 on t1.k = t2.k where t1.v > 1;
+----
+logical_plan
+01)LeftAnti Join: t1.k = t2.k
+02)--Filter: t1.v > Int32(1)
+03)----TableScan: t1 projection=[k, v]
+04)--TableScan: t2 projection=[k]
+
+query II rowsort
+select * from t1 left anti join t2 on t1.k = t2.k where t1.v > 1;
+----
+3 30
+NULL 40
+
+# left anti join + or + filter on another column -> not pushed
+query TT
+explain select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 3 or
t1.v > 20;
+----
+logical_plan
+01)LeftAnti Join: t1.k = t2.k
+02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20)
+03)----TableScan: t1 projection=[k, v]
+04)--TableScan: t2 projection=[k]
+
+query II rowsort
+select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
+----
+3 30
+50 NULL
+NULL 40
+
+
+# right anti join + filter on join key -> pushed
+query TT
+explain select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 1;
+----
+logical_plan
+01)RightAnti Join: t1.k = t2.k
+02)--Filter: t1.k > Int32(1)
+03)----TableScan: t1 projection=[k]
+04)--Filter: t2.k > Int32(1)
+05)----TableScan: t2 projection=[k, v]
+
+query II rowsort
+select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 1;
+----
+51 NULL
+
+# right anti join + filter on another column -> not pushed
+query TT
+explain select * from t1 right anti join t2 on t1.k = t2.k where t2.v > 1;
+----
+logical_plan
+01)RightAnti Join: t1.k = t2.k
+02)--TableScan: t1 projection=[k]
+03)--Filter: t2.v > Int32(1)
+04)----TableScan: t2 projection=[k, v]
+
+query II rowsort
+select * from t1 right anti join t2 on t1.k = t2.k where t2.v > 1;
+----
+NULL 41
+
+# right anti join + or + filter on another column -> not pushed
+query TT
+explain select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 3 or
t2.v > 20;
+----
+logical_plan
+01)RightAnti Join: t1.k = t2.k
+02)--TableScan: t1 projection=[k]
+03)--Filter: t2.k > Int32(3) OR t2.v > Int32(20)
+04)----TableScan: t2 projection=[k, v]
+
+query II rowsort
+select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 3 or t2.v > 20;
+----
+51 NULL
+NULL 41
+
+
+statement ok
+set datafusion.explain.logical_plan_only = false;
+
+statement ok
+drop table t1;
+
+statement ok
+drop table t2;
diff --git a/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt
b/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt
new file mode 100644
index 0000000000..e1c83c8c33
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt
@@ -0,0 +1,188 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Test push down filter
+
+statement ok
+set datafusion.explain.physical_plan_only = true;
+
+# Test push down filter with limit for parquet
+statement ok
+set datafusion.execution.parquet.pushdown_filters = true;
+
+# this one is also required to make DF skip second file due to "sufficient"
amount of rows
+statement ok
+set datafusion.execution.collect_statistics = true;
+
+# Create a table as a data source
+statement ok
+CREATE TABLE src_table (
+ part_key INT,
+ value INT
+) AS VALUES(1, 0), (1, 1), (1, 100), (2, 0), (2, 2), (2, 2), (2, 100), (3, 4),
(3, 5), (3, 6);
+
+
+# There will be more than 2 records filtered from the table to check that
`limit 1` actually applied.
+# Setup 3 files, i.e., as many as there are partitions:
+
+# File 1:
+query I
+COPY (SELECT * FROM src_table where part_key = 1)
+TO
'test_files/scratch/push_down_filter_parquet/test_filter_with_limit/part-0.parquet'
+STORED AS PARQUET;
+----
+3
+
+# File 2:
+query I
+COPY (SELECT * FROM src_table where part_key = 2)
+TO
'test_files/scratch/push_down_filter_parquet/test_filter_with_limit/part-1.parquet'
+STORED AS PARQUET;
+----
+4
+
+# File 3:
+query I
+COPY (SELECT * FROM src_table where part_key = 3)
+TO
'test_files/scratch/push_down_filter_parquet/test_filter_with_limit/part-2.parquet'
+STORED AS PARQUET;
+----
+3
+
+statement ok
+CREATE EXTERNAL TABLE test_filter_with_limit
+(
+ part_key INT,
+ value INT
+)
+STORED AS PARQUET
+LOCATION 'test_files/scratch/push_down_filter_parquet/test_filter_with_limit/';
+
+query TT
+explain select * from test_filter_with_limit where value = 2 limit 1;
+----
+physical_plan
+01)CoalescePartitionsExec: fetch=1
+02)--DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/test_filter_with_limit/part-0.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/test_filter_with_limit/part-1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/test_filter_with_limit/part-2.parquet]]},
projection=[part_key, value], limit=1, file_type=parquet, predicate=value [...]
+
+query II
+select * from test_filter_with_limit where value = 2 limit 1;
+----
+2 2
+
+
+# Tear down test_filter_with_limit table:
+statement ok
+DROP TABLE test_filter_with_limit;
+
+# Tear down src_table table:
+statement ok
+DROP TABLE src_table;
+
+
+query I
+COPY (VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (10))
+TO 'test_files/scratch/push_down_filter_parquet/t.parquet'
+STORED AS PARQUET;
+----
+10
+
+statement ok
+CREATE EXTERNAL TABLE t
+(
+ a INT
+)
+STORED AS PARQUET
+LOCATION 'test_files/scratch/push_down_filter_parquet/t.parquet';
+
+
+# The predicate should not have a column cast when the value is a valid i32
+query TT
+explain select a from t where a = '100';
+----
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]},
projection=[a], file_type=parquet, predicate=a@0 = 100,
pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <=
a_max@1, required_guarantees=[a in (100)]
+
+# The predicate should not have a column cast when the value is a valid i32
+query TT
+explain select a from t where a != '100';
+----
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]},
projection=[a], file_type=parquet, predicate=a@0 != 100,
pruning_predicate=a_null_count@2 != row_count@3 AND (a_min@0 != 100 OR 100 !=
a_max@1), required_guarantees=[a not in (100)]
+
+# The predicate should still have the column cast when the value is a NOT
valid i32
+query TT
+explain select a from t where a = '99999999999';
+----
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]},
projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99999999999
+
+# The predicate should still have the column cast when the value is a NOT
valid i32
+query TT
+explain select a from t where a = '99.99';
+----
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]},
projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) = 99.99
+
+# The predicate should still have the column cast when the value is a NOT
valid i32
+query TT
+explain select a from t where a = '';
+----
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]},
projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8) =
+
+# The predicate should not have a column cast when the operator is = or != and
the literal can be round-trip casted without losing information.
+query TT
+explain select a from t where cast(a as string) = '100';
+----
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]},
projection=[a], file_type=parquet, predicate=a@0 = 100,
pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= 100 AND 100 <=
a_max@1, required_guarantees=[a in (100)]
+
+# The predicate should still have the column cast when the literal alters its
string representation after round-trip casting (leading zero lost).
+query TT
+explain select a from t where CAST(a AS string) = '0123';
+----
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/t.parquet]]},
projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8View) = 0123
+
+
+# Test dynamic filter pushdown with swapped join inputs (issue #17196)
+# Create tables with different sizes to force join input swapping
+statement ok
+copy (select i as k from generate_series(1, 100) t(i)) to
'test_files/scratch/push_down_filter_parquet/small_table.parquet';
+
+statement ok
+copy (select i as k, i as v from generate_series(1, 1000) t(i)) to
'test_files/scratch/push_down_filter_parquet/large_table.parquet';
+
+statement ok
+create external table small_table stored as parquet location
'test_files/scratch/push_down_filter_parquet/small_table.parquet';
+
+statement ok
+create external table large_table stored as parquet location
'test_files/scratch/push_down_filter_parquet/large_table.parquet';
+
+# Test that dynamic filter is applied to the correct table after join input
swapping
+# The small_table should be the build side, large_table should be the probe
side with dynamic filter
+query TT
+explain select * from small_table join large_table on small_table.k =
large_table.k where large_table.v >= 50;
+----
+physical_plan
+01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/small_table.parquet]]},
projection=[k], file_type=parquet
+03)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+04)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/large_table.parquet]]},
projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilter [
empty ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50,
required_guarantees=[]
+
+statement ok
+drop table small_table;
+
+statement ok
+drop table large_table;
+
+statement ok
+drop table t;
diff --git a/datafusion/sqllogictest/test_files/push_down_filter_regression.slt
b/datafusion/sqllogictest/test_files/push_down_filter_regression.slt
new file mode 100644
index 0000000000..ca4a30fa96
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/push_down_filter_regression.slt
@@ -0,0 +1,200 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Test push down filter
+
+# Regression test for https://github.com/apache/datafusion/issues/17188
+query I
+COPY (select i as k from generate_series(1, 10000000) as t(i))
+TO 'test_files/scratch/push_down_filter_regression/t1.parquet'
+STORED AS PARQUET;
+----
+10000000
+
+query I
+COPY (select i as k, i as v from generate_series(1, 10000000) as t(i))
+TO 'test_files/scratch/push_down_filter_regression/t2.parquet'
+STORED AS PARQUET;
+----
+10000000
+
+statement ok
+create external table t1 stored as parquet location
'test_files/scratch/push_down_filter_regression/t1.parquet';
+
+statement ok
+create external table t2 stored as parquet location
'test_files/scratch/push_down_filter_regression/t2.parquet';
+
+# The failure before https://github.com/apache/datafusion/pull/17197 was
non-deterministic and random
+# So we'll run the same query a couple of times just to have more certainty
it's fixed
+# Sorry about the spam in this slt test...
+
+query III rowsort
+select *
+from t1
+join t2 on t1.k = t2.k
+where v = 1 or v = 10000000
+order by t1.k, t2.v;
+----
+1 1 1
+10000000 10000000 10000000
+
+query III rowsort
+select *
+from t1
+join t2 on t1.k = t2.k
+where v = 1 or v = 10000000
+order by t1.k, t2.v;
+----
+1 1 1
+10000000 10000000 10000000
+
+query III rowsort
+select *
+from t1
+join t2 on t1.k = t2.k
+where v = 1 or v = 10000000
+order by t1.k, t2.v;
+----
+1 1 1
+10000000 10000000 10000000
+
+query III rowsort
+select *
+from t1
+join t2 on t1.k = t2.k
+where v = 1 or v = 10000000
+order by t1.k, t2.v;
+----
+1 1 1
+10000000 10000000 10000000
+
+query III rowsort
+select *
+from t1
+join t2 on t1.k = t2.k
+where v = 1 or v = 10000000
+order by t1.k, t2.v;
+----
+1 1 1
+10000000 10000000 10000000
+
+# Regression test for https://github.com/apache/datafusion/issues/17512
+
+query I
+COPY (
+ SELECT arrow_cast('2025-01-01T00:00:00Z'::timestamptz,
'Timestamp(Microsecond, Some("UTC"))') AS start_timestamp
+)
+TO 'test_files/scratch/push_down_filter_regression/17512.parquet'
+STORED AS PARQUET;
+----
+1
+
+statement ok
+CREATE EXTERNAL TABLE records STORED AS PARQUET LOCATION
'test_files/scratch/push_down_filter_regression/17512.parquet';
+
+query I
+SELECT 1
+FROM (
+ SELECT start_timestamp
+ FROM records
+ WHERE start_timestamp <= '2025-01-01T00:00:00Z'::timestamptz
+) AS t
+WHERE t.start_timestamp::time < '00:00:01'::time;
+----
+1
+
+# Test aggregate dynamic filter pushdown
+# Note: most of the test coverage lives in
`datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs`
+# , to compare dynamic filter content easier. Here the tests are simple
end-to-end
+# exercises.
+
+statement ok
+set datafusion.explain.format = 'indent';
+
+statement ok
+set datafusion.explain.physical_plan_only = true;
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+set datafusion.execution.parquet.pushdown_filters = true;
+
+statement ok
+set datafusion.optimizer.enable_dynamic_filter_pushdown = true;
+
+statement ok
+set datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown = true;
+
+statement ok
+create external table agg_dyn_test stored as parquet location
'../core/tests/data/test_statistics_per_partition';
+
+# Expect dynamic filter available inside data source
+query TT
+explain select max(id) from agg_dyn_test where id > 1;
+----
+physical_plan
+01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id)]
+02)--CoalescePartitionsExec
+03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id)]
+04)------DataSourceExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet,
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet],
[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet,
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo2
[...]
+
+query I
+select max(id) from agg_dyn_test where id > 1;
+----
+4
+
+# Expect dynamic filter available inside data source
+query TT
+explain select max(id) from agg_dyn_test where (id+1) > 1;
+----
+physical_plan
+01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id)]
+02)--CoalescePartitionsExec
+03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id)]
+04)------DataSourceExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet,
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet],
[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet,
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo2
[...]
+
+# Expect dynamic filter available inside data source
+query TT
+explain select max(id), min(id) from agg_dyn_test where id < 10;
+----
+physical_plan
+01)AggregateExec: mode=Final, gby=[], aggr=[max(agg_dyn_test.id),
min(agg_dyn_test.id)]
+02)--CoalescePartitionsExec
+03)----AggregateExec: mode=Partial, gby=[], aggr=[max(agg_dyn_test.id),
min(agg_dyn_test.id)]
+04)------DataSourceExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet,
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet],
[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet,
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo2
[...]
+
+# Dynamic filter should not be available for grouping sets
+query TT
+explain select max(id) from agg_dyn_test where id < 10
+group by grouping sets ((), (id))
+----
+physical_plan
+01)ProjectionExec: expr=[max(agg_dyn_test.id)@2 as max(agg_dyn_test.id)]
+02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, __grouping_id@1 as
__grouping_id], aggr=[max(agg_dyn_test.id)]
+03)----RepartitionExec: partitioning=Hash([id@0, __grouping_id@1], 2),
input_partitions=2
+04)------AggregateExec: mode=Partial, gby=[(NULL as id), (id@0 as id)],
aggr=[max(agg_dyn_test.id)]
+05)--------DataSourceExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet,
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet],
[WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet,
WORKSPACE_ROOT/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQ
[...]
+
+statement ok
+drop table agg_dyn_test;
+
+statement ok
+drop table t1;
+
+statement ok
+drop table t2;
diff --git a/datafusion/sqllogictest/test_files/push_down_filter_unnest.slt
b/datafusion/sqllogictest/test_files/push_down_filter_unnest.slt
new file mode 100644
index 0000000000..58fe24e2e2
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/push_down_filter_unnest.slt
@@ -0,0 +1,148 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Test push down filter
+
+statement ok
+set datafusion.explain.physical_plan_only = true;
+
+statement ok
+CREATE TABLE IF NOT EXISTS v AS VALUES(1,[1,2,3]),(2,[3,4,5]);
+
+query I
+select uc2 from (select unnest(column2) as uc2, column1 from v) where column1
= 2;
+----
+3
+4
+5
+
+# test push down filter for unnest with filter on non-unnest column
+# filter plan is pushed down into projection plan
+query TT
+explain select uc2 from (select unnest(column2) as uc2, column1 from v) where
column1 = 2;
+----
+physical_plan
+01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2]
+02)--UnnestExec
+03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+04)------ProjectionExec: expr=[column2@0 as __unnest_placeholder(v.column2)]
+05)--------FilterExec: column1@0 = 2, projection=[column2@1]
+06)----------DataSourceExec: partitions=1, partition_sizes=[1]
+
+query I
+select uc2 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3;
+----
+4
+5
+
+# test push down filter for unnest with filter on unnest column
+query TT
+explain select uc2 from (select unnest(column2) as uc2, column1 from v) where
uc2 > 3;
+----
+physical_plan
+01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2]
+02)--FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3
+03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+04)------UnnestExec
+05)--------ProjectionExec: expr=[column2@0 as __unnest_placeholder(v.column2)]
+06)----------DataSourceExec: partitions=1, partition_sizes=[1]
+
+query II
+select uc2, column1 from (select unnest(column2) as uc2, column1 from v)
where uc2 > 3 AND column1 = 2;
+----
+4 2
+5 2
+
+# Could push the filter (column1 = 2) down below unnest
+query TT
+explain select uc2, column1 from (select unnest(column2) as uc2, column1 from
v) where uc2 > 3 AND column1 = 2;
+----
+physical_plan
+01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2,
column1@1 as column1]
+02)--FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3
+03)----UnnestExec
+04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+05)--------ProjectionExec: expr=[column2@1 as __unnest_placeholder(v.column2),
column1@0 as column1]
+06)----------FilterExec: column1@0 = 2
+07)------------DataSourceExec: partitions=1, partition_sizes=[1]
+
+query II
+select uc2, column1 from (select unnest(column2) as uc2, column1 from v)
where uc2 > 3 OR column1 = 2;
+----
+3 2
+4 2
+5 2
+
+# only non-unnest filter in AND clause could be pushed down
+query TT
+explain select uc2, column1 from (select unnest(column2) as uc2, column1 from
v) where uc2 > 3 OR column1 = 2;
+----
+physical_plan
+01)ProjectionExec: expr=[__unnest_placeholder(v.column2,depth=1)@0 as uc2,
column1@1 as column1]
+02)--FilterExec: __unnest_placeholder(v.column2,depth=1)@0 > 3 OR column1@1 = 2
+03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+04)------UnnestExec
+05)--------ProjectionExec: expr=[column2@1 as __unnest_placeholder(v.column2),
column1@0 as column1]
+06)----------DataSourceExec: partitions=1, partition_sizes=[1]
+
+statement ok
+drop table v;
+
+# test with unnest struct, should not push down filter
+statement ok
+CREATE TABLE d AS VALUES(1,[named_struct('a', 1, 'b',
2)]),(2,[named_struct('a', 3, 'b', 4), named_struct('a', 5, 'b', 6)]);
+
+query I?
+select * from (select column1, unnest(column2) as o from d) where o['a'] = 1;
+----
+1 {a: 1, b: 2}
+
+query TT
+explain select * from (select column1, unnest(column2) as o from d) where
o['a'] = 1;
+----
+physical_plan
+01)ProjectionExec: expr=[column1@0 as column1,
__unnest_placeholder(d.column2,depth=1)@1 as o]
+02)--FilterExec: __datafusion_extracted_1@0 = 1, projection=[column1@1,
__unnest_placeholder(d.column2,depth=1)@2]
+03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+04)------ProjectionExec:
expr=[get_field(__unnest_placeholder(d.column2,depth=1)@1, a) as
__datafusion_extracted_1, column1@0 as column1,
__unnest_placeholder(d.column2,depth=1)@1 as
__unnest_placeholder(d.column2,depth=1)]
+05)--------UnnestExec
+06)----------ProjectionExec: expr=[column1@0 as column1, column2@1 as
__unnest_placeholder(d.column2)]
+07)------------DataSourceExec: partitions=1, partition_sizes=[1]
+
+statement ok
+drop table d;
+
+statement ok
+CREATE TABLE d AS VALUES (named_struct('a', 1, 'b', 2)), (named_struct('a', 3,
'b', 4)), (named_struct('a', 5, 'b', 6));
+
+query II
+select * from (select unnest(column1) from d) where
"__unnest_placeholder(d.column1).b" > 5;
+----
+5 6
+
+query TT
+explain select * from (select unnest(column1) from d) where
"__unnest_placeholder(d.column1).b" > 5;
+----
+physical_plan
+01)FilterExec: __unnest_placeholder(d.column1).b@1 > 5
+02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+03)----UnnestExec
+04)------ProjectionExec: expr=[column1@0 as __unnest_placeholder(d.column1)]
+05)--------DataSourceExec: partitions=1, partition_sizes=[1]
+
+statement ok
+drop table d;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]